You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/30 03:40:49 UTC

spark git commit: [SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for DataFrame.repartition(1)

Repository: spark
Updated Branches:
  refs/heads/master 31bd1dab1 -> b375397b1


[SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for DataFrame.repartition(1)

## What changes were proposed in this pull request?

In `ShuffleExchangeExec`, we don't need to insert extra local sort before round-robin partitioning, if the new partitioning has only 1 partition, because under that case all output rows go to the same partition.

## How was this patch tested?

The existing test cases.

Author: Xingbo Jiang <xi...@databricks.com>

Closes #20426 from jiangxb1987/repartition1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b375397b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b375397b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b375397b

Branch: refs/heads/master
Commit: b375397b1678b7fe20a0b7f87a7e8b37ae5646ef
Parents: 31bd1da
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Tue Jan 30 11:40:42 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jan 30 11:40:42 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/execution/exchange/ShuffleExchangeExec.scala       | 4 ++++
 .../apache/spark/sql/execution/streaming/ForeachSinkSuite.scala  | 2 +-
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b375397b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 76c1fa6..4d95ee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -257,7 +257,11 @@ object ShuffleExchangeExec {
       //
       // Currently we following the most straight-forward way that perform a local sort before
       // partitioning.
+      //
+      // Note that we don't perform local sort if the new partitioning has only 1 partition, under
+      // that case all output rows go to the same partition.
       val newRdd = if (SQLConf.get.sortBeforeRepartition &&
+          newPartitioning.numPartitions > 1 &&
           newPartitioning.isInstanceOf[RoundRobinPartitioning]) {
         rdd.mapPartitionsInternal { iter =>
           val recordComparatorSupplier = new Supplier[RecordComparator] {

http://git-wip-us.apache.org/repos/asf/spark/blob/b375397b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 1248c67..41434e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -162,7 +162,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
       val allEvents = ForeachSinkSuite.allEvents()
       assert(allEvents.size === 1)
       assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0))
-      assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 2))
+      assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
 
       // `close` should be called with the error
       val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org