You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/11/15 18:32:47 UTC

spark git commit: [SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable.

Repository: spark
Updated Branches:
  refs/heads/master f14ae4900 -> 745ab8bc5


[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable.

## What changes were proposed in this pull request?

The largest parallelism in PartitioningAwareFileIndex #listLeafFilesInParallel() is 10000 in hard code. We may need to make this number configurable. And in PR, I reduce it to 100.

## How was this patch tested?

Existing ut.

Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Author: dylon <hu...@gmail.com>

Closes #15829 from uncleGen/SPARK-18379.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/745ab8bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/745ab8bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/745ab8bc

Branch: refs/heads/master
Commit: 745ab8bc50da89c42b297de9dcb833e5f2074481
Parents: f14ae49
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Authored: Tue Nov 15 10:32:43 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Nov 15 10:32:43 2016 -0800

----------------------------------------------------------------------
 .../datasources/PartitioningAwareFileIndex.scala         |  4 +++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 11 +++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/745ab8bc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 3740caa..705a1e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -315,10 +315,12 @@ object PartitioningAwareFileIndex extends Logging {
     val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
+    val parallelPartitionDiscoveryParallelism =
+      sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
 
     // Set the number of parallelism to prevent following file listing from generating many tasks
     // in case of large #defaultParallelism.
-    val numParallelism = Math.min(paths.size, 10000)
+    val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
 
     val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)

http://git-wip-us.apache.org/repos/asf/spark/blob/745ab8bc/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 71f3a67..6372936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -396,6 +396,14 @@ object SQLConf {
       .intConf
       .createWithDefault(32)
 
+  val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
+    SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
+      .doc("The number of parallelism to list a collection of path recursively, Set the " +
+        "number to prevent file listing from generating too many tasks.")
+      .internal()
+      .intConf
+      .createWithDefault(10000)
+
   // Whether to automatically resolve ambiguity in join conditions for self-joins.
   // See SPARK-6231.
   val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -774,6 +782,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
   def parallelPartitionDiscoveryThreshold: Int =
     getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
 
+  def parallelPartitionDiscoveryParallelism: Int =
+    getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
+
   def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
 
   def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org