You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/30 03:40:49 UTC
spark git commit: [SPARK-23207][SQL][FOLLOW-UP] Don't perform local
sort for DataFrame.repartition(1)
Repository: spark
Updated Branches:
refs/heads/master 31bd1dab1 -> b375397b1
[SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for DataFrame.repartition(1)
## What changes were proposed in this pull request?
In `ShuffleExchangeExec`, we don't need to insert extra local sort before round-robin partitioning, if the new partitioning has only 1 partition, because under that case all output rows go to the same partition.
## How was this patch tested?
The existing test cases.
Author: Xingbo Jiang <xi...@databricks.com>
Closes #20426 from jiangxb1987/repartition1.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b375397b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b375397b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b375397b
Branch: refs/heads/master
Commit: b375397b1678b7fe20a0b7f87a7e8b37ae5646ef
Parents: 31bd1da
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Tue Jan 30 11:40:42 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Jan 30 11:40:42 2018 +0800
----------------------------------------------------------------------
.../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++++
.../apache/spark/sql/execution/streaming/ForeachSinkSuite.scala | 2 +-
2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b375397b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 76c1fa6..4d95ee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -257,7 +257,11 @@ object ShuffleExchangeExec {
//
// Currently we following the most straight-forward way that perform a local sort before
// partitioning.
+ //
+ // Note that we don't perform local sort if the new partitioning has only 1 partition, under
+ // that case all output rows go to the same partition.
val newRdd = if (SQLConf.get.sortBeforeRepartition &&
+ newPartitioning.numPartitions > 1 &&
newPartitioning.isInstanceOf[RoundRobinPartitioning]) {
rdd.mapPartitionsInternal { iter =>
val recordComparatorSupplier = new Supplier[RecordComparator] {
http://git-wip-us.apache.org/repos/asf/spark/blob/b375397b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 1248c67..41434e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -162,7 +162,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
val allEvents = ForeachSinkSuite.allEvents()
assert(allEvents.size === 1)
assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0))
- assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 2))
+ assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
// `close` should be called with the error
val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org