You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/11/15 18:32:47 UTC
spark git commit: [SPARK-18379][SQL] Make the parallelism of
parallelPartitionDiscovery configurable.
Repository: spark
Updated Branches:
refs/heads/master f14ae4900 -> 745ab8bc5
[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable.
## What changes were proposed in this pull request?
The largest parallelism in PartitioningAwareFileIndex #listLeafFilesInParallel() is 10000 in hard code. We may need to make this number configurable. And in PR, I reduce it to 100.
## How was this patch tested?
Existing ut.
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Author: dylon <hu...@gmail.com>
Closes #15829 from uncleGen/SPARK-18379.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/745ab8bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/745ab8bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/745ab8bc
Branch: refs/heads/master
Commit: 745ab8bc50da89c42b297de9dcb833e5f2074481
Parents: f14ae49
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Authored: Tue Nov 15 10:32:43 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Nov 15 10:32:43 2016 -0800
----------------------------------------------------------------------
.../datasources/PartitioningAwareFileIndex.scala | 4 +++-
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/745ab8bc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 3740caa..705a1e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -315,10 +315,12 @@ object PartitioningAwareFileIndex extends Logging {
val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
+ val parallelPartitionDiscoveryParallelism =
+ sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, 10000)
+ val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
http://git-wip-us.apache.org/repos/asf/spark/blob/745ab8bc/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 71f3a67..6372936 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -396,6 +396,14 @@ object SQLConf {
.intConf
.createWithDefault(32)
+ val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
+ SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
+ .doc("The number of parallelism to list a collection of path recursively, Set the " +
+ "number to prevent file listing from generating too many tasks.")
+ .internal()
+ .intConf
+ .createWithDefault(10000)
+
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -774,6 +782,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
+ def parallelPartitionDiscoveryParallelism: Int =
+ getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
+
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org