You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/01/02 15:26:13 UTC
spark git commit: [SPARK-18379][SQL] Make the parallelism of
parallelPartitionDiscovery configurable.
Repository: spark
Updated Branches:
refs/heads/branch-2.1 63857c8d3 -> 517f39833
[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable.
## What changes were proposed in this pull request?
The largest parallelism in PartitioningAwareFileIndex #listLeafFilesInParallel() is 10000 in hard code. We may need to make this number configurable. And in PR, I reduce it to 100.
## How was this patch tested?
Existing ut.
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Author: dylon <hu...@gmail.com>
Closes #15829 from uncleGen/SPARK-18379.
(cherry picked from commit 745ab8bc50da89c42b297de9dcb833e5f2074481)
Signed-off-by: Sean Owen <so...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/517f3983
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/517f3983
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/517f3983
Branch: refs/heads/branch-2.1
Commit: 517f39833cf789b536defe5ba4b010828d24831f
Parents: 63857c8
Author: genmao.ygm <ge...@genmaoygmdeMacBook-Air.local>
Authored: Tue Nov 15 10:32:43 2016 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jan 2 15:26:09 2017 +0000
----------------------------------------------------------------------
.../datasources/PartitioningAwareFileIndex.scala | 4 +++-
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index f22b55b..825a0f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -309,10 +309,12 @@ object PartitioningAwareFileIndex extends Logging {
val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializedPaths = paths.map(_.toString)
+ val parallelPartitionDiscoveryParallelism =
+ sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism
// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
- val numParallelism = Math.min(paths.size, 10000)
+ val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)
val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
http://git-wip-us.apache.org/repos/asf/spark/blob/517f3983/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5454be4..8fbad60 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -391,6 +391,14 @@ object SQLConf {
.intConf
.createWithDefault(32)
+ val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
+ SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism")
+ .doc("The number of parallelism to list a collection of path recursively, Set the " +
+ "number to prevent file listing from generating too many tasks.")
+ .internal()
+ .intConf
+ .createWithDefault(10000)
+
// Whether to automatically resolve ambiguity in join conditions for self-joins.
// See SPARK-6231.
val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
@@ -769,6 +777,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def parallelPartitionDiscoveryThreshold: Int =
getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD)
+ def parallelPartitionDiscoveryParallelism: Int =
+ getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM)
+
def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED)
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org