You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/17 01:21:10 UTC

[spark] branch branch-2.4 updated: [SPARK-26316][SPARK-21052][BRANCH-2.4] Revert hash join metrics in that causes performance degradation

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new d650075  [SPARK-26316][SPARK-21052][BRANCH-2.4] Revert hash join metrics in that causes performance degradation
d650075 is described below

commit d650075b39e8dc6012a71f8d21577c5c5f19ba90
Author: jiake <ke...@intel.com>
AuthorDate: Sun Dec 16 17:20:58 2018 -0800

    [SPARK-26316][SPARK-21052][BRANCH-2.4] Revert hash join metrics in that causes performance degradation
    
    ## What changes were proposed in this pull request?
    revert spark 21052 in spark 2.4 because of the discussion in [PR23269](https://github.com/apache/spark/pull/23269)
    
    ## How was this patch tested?
    N/A
    
    Closes #23318 from JkSelf/branch-2.4-revert21052.
    
    Authored-by: jiake <ke...@intel.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../execution/joins/BroadcastHashJoinExec.scala    | 27 +------
 .../spark/sql/execution/joins/HashJoin.scala       |  7 +-
 .../spark/sql/execution/joins/HashedRelation.scala | 31 -------
 .../sql/execution/joins/ShuffledHashJoinExec.scala |  6 +-
 .../sql/execution/metric/SQLMetricsSuite.scala     | 94 +---------------------
 5 files changed, 6 insertions(+), 159 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index a6f3ea4..b25a34c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -48,8 +48,7 @@ case class BroadcastHashJoinExec(
   extends BinaryExecNode with HashJoin with CodegenSupport {
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-    "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
 
   override def requiredChildDistribution: Seq[Distribution] = {
     val mode = HashedRelationBroadcastMode(buildKeys)
@@ -63,13 +62,12 @@ case class BroadcastHashJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
-    val avgHashProbe = longMetric("avgHashProbe")
 
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
     streamedPlan.execute().mapPartitions { streamedIter =>
       val hashed = broadcastRelation.value.asReadOnlyCopy()
       TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
-      join(streamedIter, hashed, numOutputRows, avgHashProbe)
+      join(streamedIter, hashed, numOutputRows)
     }
   }
 
@@ -112,23 +110,6 @@ case class BroadcastHashJoinExec(
   }
 
   /**
-   * Returns the codes used to add a task completion listener to update avg hash probe
-   * at the end of the task.
-   */
-  private def genTaskListener(avgHashProbe: String, relationTerm: String): String = {
-    val listenerClass = classOf[TaskCompletionListener].getName
-    val taskContextClass = classOf[TaskContext].getName
-    s"""
-       | $taskContextClass$$.MODULE$$.get().addTaskCompletionListener(new $listenerClass() {
-       |   @Override
-       |   public void onTaskCompletion($taskContextClass context) {
-       |     $avgHashProbe.set($relationTerm.getAverageProbesPerLookup());
-       |   }
-       | });
-     """.stripMargin
-  }
-
-  /**
    * Returns a tuple of Broadcast of HashedRelation and the variable name for it.
    */
   private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
@@ -137,15 +118,11 @@ case class BroadcastHashJoinExec(
     val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
     val clsName = broadcastRelation.value.getClass.getName
 
-    // At the end of the task, we update the avg hash probe.
-    val avgHashProbe = metricTerm(ctx, "avgHashProbe")
-
     // Inline mutable state since not many join operations in a task
     val relationTerm = ctx.addMutableState(clsName, "relation",
       v => s"""
          | $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
          | incPeakExecutionMemory($v.estimatedSize());
-         | ${genTaskListener(avgHashProbe, v)}
        """.stripMargin, forceInline = true)
     (broadcastRelation, relationTerm)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index dab873b..b197bf6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -194,8 +194,7 @@ trait HashJoin {
   protected def join(
       streamedIter: Iterator[InternalRow],
       hashed: HashedRelation,
-      numOutputRows: SQLMetric,
-      avgHashProbe: SQLMetric): Iterator[InternalRow] = {
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
 
     val joinedIter = joinType match {
       case _: InnerLike =>
@@ -213,10 +212,6 @@ trait HashJoin {
           s"BroadcastHashJoin should not take $x as the JoinType")
     }
 
-    // At the end of the task, we update the avg hash probe.
-    TaskContext.get().addTaskCompletionListener[Unit](_ =>
-      avgHashProbe.set(hashed.getAverageProbesPerLookup))
-
     val resultProj = createResultProjection
     joinedIter.map { r =>
       numOutputRows += 1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 86eb47a..95643040 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -81,10 +81,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation {
    */
   def close(): Unit
 
-  /**
-   * Returns the average number of probes per key lookup.
-   */
-  def getAverageProbesPerLookup: Double
 }
 
 private[execution] object HashedRelation {
@@ -281,7 +277,6 @@ private[joins] class UnsafeHashedRelation(
     read(() => in.readInt(), () => in.readLong(), in.readBytes)
   }
 
-  override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
 }
 
 private[joins] object UnsafeHashedRelation {
@@ -395,10 +390,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
   // The number of unique keys.
   private var numKeys = 0L
 
-  // Tracking average number of probes per key lookup.
-  private var numKeyLookups = 0L
-  private var numProbes = 0L
-
   // needed by serializer
   def this() = {
     this(
@@ -483,8 +474,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
    */
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
     if (isDense) {
-      numKeyLookups += 1
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -493,14 +482,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
       }
     } else {
       var pos = firstSlot(key)
-      numKeyLookups += 1
-      numProbes += 1
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return getRow(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -528,8 +514,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
    */
   def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
     if (isDense) {
-      numKeyLookups += 1
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -538,14 +522,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
       }
     } else {
       var pos = firstSlot(key)
-      numKeyLookups += 1
-      numProbes += 1
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return valueIter(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -585,11 +566,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
   private def updateIndex(key: Long, address: Long): Unit = {
     var pos = firstSlot(key)
     assert(numKeys < array.length / 2)
-    numKeyLookups += 1
-    numProbes += 1
     while (array(pos) != key && array(pos + 1) != 0) {
       pos = nextSlot(pos)
-      numProbes += 1
     }
     if (array(pos + 1) == 0) {
       // this is the first value for this key, put the address in array.
@@ -721,8 +699,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
     writeLong(maxKey)
     writeLong(numKeys)
     writeLong(numValues)
-    writeLong(numKeyLookups)
-    writeLong(numProbes)
 
     writeLong(array.length)
     writeLongArray(writeBuffer, array, array.length)
@@ -764,8 +740,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
     maxKey = readLong()
     numKeys = readLong()
     numValues = readLong()
-    numKeyLookups = readLong()
-    numProbes = readLong()
 
     val length = readLong().toInt
     mask = length - 2
@@ -784,10 +758,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
     read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
   }
 
-  /**
-   * Returns the average number of probes per key lookup.
-   */
-  def getAverageProbesPerLookup: Double = numProbes.toDouble / numKeyLookups
 }
 
 private[joins] class LongHashedRelation(
@@ -840,7 +810,6 @@ private[joins] class LongHashedRelation(
     map = in.readObject().asInstanceOf[LongToUnsafeRowMap]
   }
 
-  override def getAverageProbesPerLookup: Double = map.getAverageProbesPerLookup
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 2b59ed6..524804d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -42,8 +42,7 @@ case class ShuffledHashJoinExec(
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
-    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"),
-    "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
+    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
 
   override def requiredChildDistribution: Seq[Distribution] =
     HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
@@ -63,10 +62,9 @@ case class ShuffledHashJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
-    val avgHashProbe = longMetric("avgHashProbe")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows, avgHashProbe)
+      join(streamIter, hashed, numOutputRows)
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index c550bf2..9fd2868 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -231,50 +231,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
     )
   }
 
-  test("BroadcastHashJoin metrics: track avg probe") {
-    // The executed plan looks like:
-    // Project [a#210, b#211, b#221]
-    // +- BroadcastHashJoin [a#210], [a#220], Inner, BuildRight
-    //    :- Project [_1#207 AS a#210, _2#208 AS b#211]
-    //    :  +- Filter isnotnull(_1#207)
-    //    :     +- LocalTableScan [_1#207, _2#208]
-    //    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, binary, true]))
-    //       +- Project [_1#217 AS a#220, _2#218 AS b#221]
-    //          +- Filter isnotnull(_1#217)
-    //             +- LocalTableScan [_1#217, _2#218]
-    //
-    // Assume the execution plan with node id is
-    // WholeStageCodegen disabled:
-    // Project(nodeId = 0)
-    //   BroadcastHashJoin(nodeId = 1)
-    //     ...(ignored)
-    //
-    // WholeStageCodegen enabled:
-    // WholeStageCodegen(nodeId = 0)
-    //   Project(nodeId = 1)
-    //     BroadcastHashJoin(nodeId = 2)
-    //       Project(nodeId = 3)
-    //         Filter(nodeId = 4)
-    //           ...(ignored)
-    Seq(true, false).foreach { enableWholeStage =>
-      val df1 = generateRandomBytesDF()
-      val df2 = generateRandomBytesDF()
-      val df = df1.join(broadcast(df2), "a")
-      val nodeIds = if (enableWholeStage) {
-        Set(2L)
-      } else {
-        Set(1L)
-      }
-      val metrics = getSparkPlanMetrics(df, 2, nodeIds, enableWholeStage).get
-      nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
-        probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
-          assert(probe.toDouble > 1.0)
-        }
-      }
-    }
-  }
-
   test("ShuffledHashJoin metrics") {
     withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40",
         "spark.sql.shuffle.partitions" -> "2",
@@ -287,59 +243,11 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       val metrics = getSparkPlanMetrics(df, 1, Set(1L))
       testSparkPlanMetrics(df, 1, Map(
         1L -> (("ShuffledHashJoin", Map(
-          "number of output rows" -> 2L,
-          "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))))
+          "number of output rows" -> 2L))))
       )
     }
   }
 
-  test("ShuffledHashJoin metrics: track avg probe") {
-    // The executed plan looks like:
-    // Project [a#308, b#309, b#319]
-    // +- ShuffledHashJoin [a#308], [a#318], Inner, BuildRight
-    //    :- Exchange hashpartitioning(a#308, 2)
-    //    :  +- Project [_1#305 AS a#308, _2#306 AS b#309]
-    //    :     +- Filter isnotnull(_1#305)
-    //    :        +- LocalTableScan [_1#305, _2#306]
-    //    +- Exchange hashpartitioning(a#318, 2)
-    //       +- Project [_1#315 AS a#318, _2#316 AS b#319]
-    //          +- Filter isnotnull(_1#315)
-    //             +- LocalTableScan [_1#315, _2#316]
-    //
-    // Assume the execution plan with node id is
-    // WholeStageCodegen disabled:
-    // Project(nodeId = 0)
-    //   ShuffledHashJoin(nodeId = 1)
-    //     ...(ignored)
-    //
-    // WholeStageCodegen enabled:
-    // WholeStageCodegen(nodeId = 0)
-    //   Project(nodeId = 1)
-    //     ShuffledHashJoin(nodeId = 2)
-    //       ...(ignored)
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "5000000",
-        "spark.sql.shuffle.partitions" -> "2",
-        "spark.sql.join.preferSortMergeJoin" -> "false") {
-      Seq(true, false).foreach { enableWholeStage =>
-        val df1 = generateRandomBytesDF(65535 * 5)
-        val df2 = generateRandomBytesDF(65535)
-        val df = df1.join(df2, "a")
-        val nodeIds = if (enableWholeStage) {
-          Set(2L)
-        } else {
-          Set(1L)
-        }
-        val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
-        nodeIds.foreach { nodeId =>
-          val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
-          probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
-            assert(probe.toDouble > 1.0)
-          }
-        }
-      }
-    }
-  }
-
   test("BroadcastHashJoin(outer) metrics") {
     val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
     val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org