You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/07/16 13:57:53 UTC

[spark] branch branch-2.4 updated: Revert "[SPARK-27485] EnsureRequirements.reorder should handle duplicate expressions gracefully"

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 63898cb  Revert "[SPARK-27485] EnsureRequirements.reorder should handle duplicate expressions gracefully"
63898cb is described below

commit 63898cbc1db46be8bcbb46d21fe01340bc883520
Author: gatorsmile <ga...@gmail.com>
AuthorDate: Tue Jul 16 06:57:14 2019 -0700

    Revert "[SPARK-27485] EnsureRequirements.reorder should handle duplicate expressions gracefully"
    
    This reverts commit 72f547d4a960ba0ba9cace53a0a5553eca1b4dd6.
---
 .../execution/exchange/EnsureRequirements.scala    | 72 ++++++++++------------
 .../scala/org/apache/spark/sql/JoinSuite.scala     | 20 ------
 .../apache/spark/sql/execution/PlannerSuite.scala  | 26 --------
 3 files changed, 32 insertions(+), 86 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index bdb9a31..d2d5011 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -24,7 +24,8 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec,
+  SortMergeJoinExec}
 import org.apache.spark.sql.internal.SQLConf
 
 /**
@@ -220,41 +221,25 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
   }
 
   private def reorder(
-      leftKeys: IndexedSeq[Expression],
-      rightKeys: IndexedSeq[Expression],
+      leftKeys: Seq[Expression],
+      rightKeys: Seq[Expression],
       expectedOrderOfKeys: Seq[Expression],
       currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
-    if (expectedOrderOfKeys.size != currentOrderOfKeys.size) {
-      return (leftKeys, rightKeys)
-    }
-
-    // Build a lookup between an expression and the positions its holds in the current key seq.
-    val keyToIndexMap = mutable.Map.empty[Expression, mutable.BitSet]
-    currentOrderOfKeys.zipWithIndex.foreach {
-      case (key, index) =>
-        keyToIndexMap.getOrElseUpdate(key.canonicalized, mutable.BitSet.empty).add(index)
-    }
-
-    // Reorder the keys.
-    val leftKeysBuffer = new ArrayBuffer[Expression](leftKeys.size)
-    val rightKeysBuffer = new ArrayBuffer[Expression](rightKeys.size)
-    val iterator = expectedOrderOfKeys.iterator
-    while (iterator.hasNext) {
-      // Lookup the current index of this key.
-      keyToIndexMap.get(iterator.next().canonicalized) match {
-        case Some(indices) if indices.nonEmpty =>
-          // Take the first available index from the map.
-          val index = indices.firstKey
-          indices.remove(index)
+    val leftKeysBuffer = ArrayBuffer[Expression]()
+    val rightKeysBuffer = ArrayBuffer[Expression]()
+    val pickedIndexes = mutable.Set[Int]()
+    val keysAndIndexes = currentOrderOfKeys.zipWithIndex
 
-          // Add the keys for that index to the reordered keys.
-          leftKeysBuffer += leftKeys(index)
-          rightKeysBuffer += rightKeys(index)
-        case _ =>
-          // The expression cannot be found, or we have exhausted all indices for that expression.
-          return (leftKeys, rightKeys)
-      }
-    }
+    expectedOrderOfKeys.foreach(expression => {
+      val index = keysAndIndexes.find { case (e, idx) =>
+        // As we may have the same key used many times, we need to filter out its occurrence we
+        // have already used.
+        e.semanticEquals(expression) && !pickedIndexes.contains(idx)
+      }.map(_._2).get
+      pickedIndexes += index
+      leftKeysBuffer.append(leftKeys(index))
+      rightKeysBuffer.append(rightKeys(index))
+    })
     (leftKeysBuffer, rightKeysBuffer)
   }
 
@@ -264,13 +249,20 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
       leftPartitioning: Partitioning,
       rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = {
     if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) {
-      (leftPartitioning, rightPartitioning) match {
-        case (HashPartitioning(leftExpressions, _), _) =>
-          reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, leftExpressions, leftKeys)
-        case (_, HashPartitioning(rightExpressions, _)) =>
-          reorder(leftKeys.toIndexedSeq, rightKeys.toIndexedSeq, rightExpressions, rightKeys)
-        case _ =>
-          (leftKeys, rightKeys)
+      leftPartitioning match {
+        case HashPartitioning(leftExpressions, _)
+          if leftExpressions.length == leftKeys.length &&
+            leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) =>
+          reorder(leftKeys, rightKeys, leftExpressions, leftKeys)
+
+        case _ => rightPartitioning match {
+          case HashPartitioning(rightExpressions, _)
+            if rightExpressions.length == rightKeys.length &&
+              rightKeys.forall(x => rightExpressions.exists(_.semanticEquals(x))) =>
+            reorder(leftKeys, rightKeys, rightExpressions, rightKeys)
+
+          case _ => (leftKeys, rightKeys)
+        }
       }
     } else {
       (leftKeys, rightKeys)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a44deaf..52fa22c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -894,26 +894,6 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("SPARK-27485: EnsureRequirements should not fail join with duplicate keys") {
-    withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
-      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
-      val tbl_a = spark.range(40)
-        .select($"id" as "x", $"id" % 10 as "y")
-        .repartition(2, $"x", $"y", $"x")
-        .as("tbl_a")
-
-      val tbl_b = spark.range(20)
-        .select($"id" as "x", $"id" % 2 as "y1", $"id" % 20 as "y2")
-        .as("tbl_b")
-
-      val res = tbl_a
-        .join(tbl_b,
-          $"tbl_a.x" === $"tbl_b.x" && $"tbl_a.y" === $"tbl_b.y1" && $"tbl_a.y" === $"tbl_b.y2")
-        .select($"tbl_a.x")
-      checkAnswer(res, Row(0L) :: Row(1L) :: Nil)
-    }
-  }
-
   test("SPARK-26352: join reordering should not change the order of columns") {
     withTable("tab1", "tab2", "tab3") {
       spark.sql("select 1 as x, 100 as y").write.saveAsTable("tab1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 3c3af80..d9fb172 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -697,32 +697,6 @@ class PlannerSuite extends SharedSQLContext {
     }
   }
 
-  test("SPARK-27485: EnsureRequirements.reorder should handle duplicate expressions") {
-    val plan1 = DummySparkPlan(
-      outputPartitioning = HashPartitioning(exprA :: exprB :: exprA :: Nil, 5))
-    val plan2 = DummySparkPlan()
-    val smjExec = SortMergeJoinExec(
-      leftKeys = exprA :: exprB :: exprB :: Nil,
-      rightKeys = exprA :: exprC :: exprC :: Nil,
-      joinType = Inner,
-      condition = None,
-      left = plan1,
-      right = plan2)
-    val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec)
-    outputPlan match {
-      case SortMergeJoinExec(leftKeys, rightKeys, _, _,
-             SortExec(_, _,
-               ShuffleExchangeExec(HashPartitioning(leftPartitioningExpressions, _), _), _),
-             SortExec(_, _,
-               ShuffleExchangeExec(HashPartitioning(rightPartitioningExpressions, _), _), _)) =>
-        assert(leftKeys === smjExec.leftKeys)
-        assert(rightKeys === smjExec.rightKeys)
-        assert(leftKeys === leftPartitioningExpressions)
-        assert(rightKeys === rightPartitioningExpressions)
-      case _ => fail(outputPlan.toString)
-    }
-  }
-
   test("SPARK-24500: create union with stream of children") {
     val df = Union(Stream(
       Range(1, 1, 1, 1),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org