You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/09/02 00:14:11 UTC

spark git commit: [SPARK-16302][SQL] Set the right number of partitions for reading data from a local collection.

Repository: spark
Updated Branches:
  refs/heads/master 5bea8757c -> 06e33985c


[SPARK-16302][SQL] Set the right number of partitions for reading data from a local collection.

follow #13137 This pr sets the right number of partitions when reading data from a local collection.
Query 'val df = Seq((1, 2)).toDF("key", "value").count' always use defaultParallelism tasks. So it causes run many empty or small tasks.

Manually tested and checked.

Author: Lianhui Wang <li...@gmail.com>

Closes #13979 from lianhuiwang/localTable-Parallel.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06e33985
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06e33985
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06e33985

Branch: refs/heads/master
Commit: 06e33985c631fe91e1c4cef6039b8752548cc435
Parents: 5bea875
Author: Lianhui Wang <li...@gmail.com>
Authored: Thu Sep 1 17:08:33 2016 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Sep 1 17:09:23 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/LocalTableScanExec.scala     | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06e33985/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 556f482..6598fa3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -42,7 +42,10 @@ case class LocalTableScanExec(
     }
   }
 
-  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
+  private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 1),
+    sqlContext.sparkContext.defaultParallelism)
+
+  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism)
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org