You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/01/02 15:26:13 UTC

spark git commit: [SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable.

Repository: spark
Updated Branches:
  refs/heads/branch-2.1 63857c8d3 -> 517f39833


[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable.

## What changes were proposed in this pull request?

The largest parallelism in PartitioningAwareFileIndex #listLeafFilesInParallel() is 10000 in hard code. We may need to make this number configurable. And in PR, I reduce it to 100.

## How was this patch tested?

Existing ut.

Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Author: dylon <hu...@gmail.com>

Closes #15829 from uncleGen/SPARK-18379.

(cherry picked from commit 745ab8bc50da89c42b297de9dcb833e5f2074481)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/517f3983
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/517f3983
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/517f3983

Branch: refs/heads/branch-2.1
Commit: 517f39833cf789b536defe5ba4b010828d24831f
Parents: 63857c8
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Authored: Tue Nov 15 10:32:43 2016 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jan 2 15:26:09 2017 +0000

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileIndex.scala         |  4 +++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 +++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index f22b55b..825a0f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -309,10 +309,12 @@ object PartitioningAwareFileIndex extends Logging {
     val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
+    val parallelPartitionDiscoveryParallelism =
+      sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
 
     // Set the number of parallelism to prevent following file listing from generating many tasks
     // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
+    val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
 
     val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)

http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5454be4..8fbad60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -391,6 +391,14 @@ object SQLConf {
       .intConf
       .createWithDefault(32)
 
+  val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
+    SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
+      .doc("The number of parallelism to list a collection of path recursively, Set the " +
+        "number to prevent file listing from generating too many tasks.")
+      .internal()
+      .intConf
+      .createWithDefault(10000)
+
   // Whether to automatically resolve ambiguity in join conditions for self-joins.
   // See SPARK-6231.
   val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -769,6 +777,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 
+  def parallelPartitionDiscoveryParallelism: Int =
+    getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
+
   def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
 
   def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org