You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/07/16 13:57:53 UTC
[spark] branch branch-2.4 updated: Revert "[SPARK-27485]
EnsureRequirements.reorder should handle duplicate expressions gracefully"
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 63898cb Revert "[SPARK-27485] EnsureRequirements.reorder should handle duplicate expressions gracefully"
63898cb is described below
commit 63898cbc1db46be8bcbb46d21fe01340bc883520
Author: gatorsmile <ga...@gmail.com>
AuthorDate: Tue Jul 16 06:57:14 2019 -0700
Revert "[SPARK-27485] EnsureRequirements.reorder should handle duplicate expressions gracefully"
This reverts commit 72f547d4a960ba0ba9cace53a0a5553eca1b4dd6.
---
.../execution/exchange/EnsureRequirements.scala | 72 ++++++++++------------
.../scala/org/apache/spark/sql/JoinSuite.scala | 20 ------
.../apache/spark/sql/execution/PlannerSuite.scala | 26 --------
3 files changed, 32 insertions(+), 86 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index bdb9a31..d2d5011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec,
+ SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf
/**
@@ -220,41 +221,25 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
}
private def reorder(
- leftKeys: IndexedSeq[Expression],
- rightKeys: IndexedSeq[Expression],
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
expectedOrderOfKeys: Seq[Expression],
currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
- if (expectedOrderOfKeys.size != currentOrderOfKeys.size) {
- return (leftKeys, rightKeys)
- }
-
- // Build a lookup between an expression and the positions its holds in the current key seq.
- val keyToIndexMap = mutable.Map.empty[Expression, mutable.BitSet]
- currentOrderOfKeys.zipWithIndex.foreach {
- case (key, index) =>
- keyToIndexMap.getOrElseUpdate(key.canonicalized, mutable.BitSet.empty).add(index)
- }
-
- // Reorder the keys.
- val leftKeysBuffer = new ArrayBuffer[Expression](leftKeys.size)
- val rightKeysBuffer = new ArrayBuffer[Expression](rightKeys.size)
- val iterator = expectedOrderOfKeys.iterator
- while (iterator.hasNext) {
- // Lookup the current index of this key.
- keyToIndexMap.get(iterator.next().canonicalized) match {
- case Some(indices) if indices.nonEmpty =>
- // Take the first available index from the map.
- val index = indices.firstKey
- indices.remove(index)
+ val leftKeysBuffer = ArrayBuffer[Expression]()
+ val rightKeysBuffer = ArrayBuffer[Expression]()
+ val pickedIndexes = mutable.Set[Int]()
+ val keysAndIndexes = currentOrderOfKeys.zipWithIndex
- // Add the keys for that index to the reordered keys.
- leftKeysBuffer += leftKeys(index)
- rightKeysBuffer += rightKeys(index)
- case _ =>
- // The expression cannot be found, or we have exhausted all indices for that expression.
- return (leftKeys, rightKeys)
- }
- }
+ expectedOrderOfKeys.foreach(expression => {
+ val index = keysAndIndexes.find { case (e, idx) =>
+ // As we may have the same key used many times, we need to filter out its occurrence we
+ // have already used.
+ e.semanticEquals(expression) && !pickedIndexes.contains(idx)
+ }.map(_._2).get
+ pickedIndexes += index
+ leftKeysBuffer.append(leftKeys(index))
+ rightKeysBuffer.append(rightKeys(index))
+ })
(leftKeysBuffer, rightKeysBuffer)
}
@@ -264,13 +249,20 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
leftPartitioning: Partitioning,
rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) {
- (leftPartitioning, rightPartitioning) match {
- case (HashPartitioning(leftExpressions, _), _) =>
- reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leftExpressions, leftKeys)
- case (_, HashPartitioning(rightExpressions, _)) =>
- reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
- case _ =>
- (leftKeys, rightKeys)
+ leftPartitioning match {
+ case HashPartitioning(leftExpressions, _)
+ if leftExpressions.length == leftKeys.length &&
+ leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) =>
+ reorder(leftKeys, rightKeys, leftExpressions, leftKeys)
+
+ case _ => rightPartitioning match {
+ case HashPartitioning(rightExpressions, _)
+ if rightExpressions.length == rightKeys.length &&
+ rightKeys.forall(x => rightExpressions.exists(_.semanticEquals(x))) =>
+ reorder(leftKeys, rightKeys, rightExpressions, rightKeys)
+
+ case _ => (leftKeys, rightKeys)
+ }
}
} else {
(leftKeys, rightKeys)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a44deaf..52fa22c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -894,26 +894,6 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
}
- test("SPARK-27485: EnsureRequirements should not fail join with duplicate keys") {
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
- SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
- val tbl_a = spark.range(40)
- .select($"id" as "x", $"id" % 10 as "y")
- .repartition(2, $"x", $"y", $"x")
- .as("tbl_a")
-
- val tbl_b = spark.range(20)
- .select($"id" as "x", $"id" % 2 as "y1", $"id" % 20 as "y2")
- .as("tbl_b")
-
- val res = tbl_a
- .join(tbl_b,
- $"tbl_a.x" === $"tbl_b.x" && $"tbl_a.y" === $"tbl_b.y1" && $"tbl_a.y" === $"tbl_b.y2")
- .select($"tbl_a.x")
- checkAnswer(res, Row(0L) :: Row(1L) :: Nil)
- }
- }
-
test("SPARK-26352: join reordering should not change the order of columns") {
withTable("tab1", "tab2", "tab3") {
spark.sql("select 1 as x, 100 as y").write.saveAsTable("tab1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 3c3af80..d9fb172 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -697,32 +697,6 @@ class PlannerSuite extends SharedSQLContext {
}
}
- test("SPARK-27485: EnsureRequirements.reorder should handle duplicate expressions") {
- val plan1 = DummySparkPlan(
- outputPartitioning = HashPartitioning(exprA :: exprB :: exprA :: Nil, 5))
- val plan2 = DummySparkPlan()
- val smjExec = SortMergeJoinExec(
- leftKeys = exprA :: exprB :: exprB :: Nil,
- rightKeys = exprA :: exprC :: exprC :: Nil,
- joinType = Inner,
- condition = None,
- left = plan1,
- right = plan2)
- val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec)
- outputPlan match {
- case SortMergeJoinExec(leftKeys, rightKeys, _, _,
- SortExec(_, _,
- ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _), _),
- SortExec(_, _,
- ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _), _)) =>
- assert(leftKeys === smjExec.leftKeys)
- assert(rightKeys === smjExec.rightKeys)
- assert(leftKeys === leftPartitioningExpressions)
- assert(rightKeys === rightPartitioningExpressions)
- case _ => fail(outputPlan.toString)
- }
- }
-
test("SPARK-24500: create union with stream of children") {
val df = Union(Stream(
Range(1, 1, 1, 1),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org