You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/11 13:24:33 UTC
[spark] branch master updated: [SPARK-26316][SPARK-21052] Revert
hash join metrics in that causes performance degradation
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5c67a9a [SPARK-26316][SPARK-21052] Revert hash join metrics in that causes performance degradation
5c67a9a is described below
commit 5c67a9a7fa29836fc825504bbcc3c3fc820009c6
Author: jiake <ke...@intel.com>
AuthorDate: Tue Dec 11 21:23:27 2018 +0800
[SPARK-26316][SPARK-21052] Revert hash join metrics in that causes performance degradation
## What changes were proposed in this pull request?
The wrong implementation in the hash join metrics in [spark 21052](https://issues.apache.org/jira/browse/SPARK-21052) caused significant performance degradation in TPC-DS. And the result is [here](https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0) in TPC-DS 1TB scale. So we currently partial revert 21052.
**Cluster info:**
| Master Node | Worker Nodes
-- | -- | --
Node | 1x | 4x
Processor | Intel(R) Xeon(R) Platinum 8170 CPU 2.10GHz | Intel(R) Xeon(R) Platinum 8180 CPU 2.50GHz
Memory | 192 GB | 384 GB
Storage Main | 8 x 960G SSD | 8 x 960G SSD
Network | 10Gbe |
Role | CM Management NameNodeSecondary NameNodeResource ManagerHive Metastore Server | DataNodeNodeManager
OS Version | CentOS 7.2 | CentOS 7.2
Hadoop | Apache Hadoop 2.7.5 | Apache Hadoop 2.7.5
Hive | Apache Hive 2.2.0 |
Spark | Apache Spark 2.1.0 & Apache Spark2.3.0 |
JDK version | 1.8.0_112 | 1.8.0_112
**Related parameters setting:**
Component | Parameter | Value
-- | -- | --
Yarn Resource Manager | yarn.scheduler.maximum-allocation-mb | 120GB
| yarn.scheduler.minimum-allocation-mb | 1GB
| yarn.scheduler.maximum-allocation-vcores | 121
| Yarn.resourcemanager.scheduler.class | Fair Scheduler
Yarn Node Manager | yarn.nodemanager.resource.memory-mb | 120GB
| yarn.nodemanager.resource.cpu-vcores | 121
Spark | spark.executor.memory | 110GB
| spark.executor.cores | 50
## How was this patch tested?
N/A
Closes #23269 from JkSelf/partial-revert-21052.
Authored-by: jiake <ke...@intel.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/joins/BroadcastHashJoinExec.scala | 28 +------
.../spark/sql/execution/joins/HashJoin.scala | 8 +-
.../spark/sql/execution/joins/HashedRelation.scala | 35 --------
.../sql/execution/joins/ShuffledHashJoinExec.scala | 6 +-
.../sql/execution/metric/SQLMetricsSuite.scala | 94 +---------------------
5 files changed, 6 insertions(+), 165 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index a6f3ea4..fd4a789 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Dist
import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{BooleanType, LongType}
-import org.apache.spark.util.TaskCompletionListener
/**
* Performs an inner hash join of two child relations. When the output RDD of this operator is
@@ -48,8 +47,7 @@ case class BroadcastHashJoinExec(
extends BinaryExecNode with HashJoin with CodegenSupport {
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
- "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override def requiredChildDistribution: Seq[Distribution] = {
val mode = HashedRelationBroadcastMode(buildKeys)
@@ -63,13 +61,12 @@ case class BroadcastHashJoinExec(
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
- val avgHashProbe = longMetric("avgHashProbe")
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
streamedPlan.execute().mapPartitions { streamedIter =>
val hashed = broadcastRelation.value.asReadOnlyCopy()
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
- join(streamedIter, hashed, numOutputRows, avgHashProbe)
+ join(streamedIter, hashed, numOutputRows)
}
}
@@ -112,23 +109,6 @@ case class BroadcastHashJoinExec(
}
/**
- * Returns the codes used to add a task completion listener to update avg hash probe
- * at the end of the task.
- */
- private def genTaskListener(avgHashProbe: String, relationTerm: String): String = {
- val listenerClass = classOf[TaskCompletionListener].getName
- val taskContextClass = classOf[TaskContext].getName
- s"""
- | $taskContextClass$$.MODULE$$.get().addTaskCompletionListener(new $listenerClass() {
- | @Override
- | public void onTaskCompletion($taskContextClass context) {
- | $avgHashProbe.set($relationTerm.getAverageProbesPerLookup());
- | }
- | });
- """.stripMargin
- }
-
- /**
* Returns a tuple of Broadcast of HashedRelation and the variable name for it.
*/
private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
@@ -137,15 +117,11 @@ case class BroadcastHashJoinExec(
val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
val clsName = broadcastRelation.value.getClass.getName
- // At the end of the task, we update the avg hash probe.
- val avgHashProbe = metricTerm(ctx, "avgHashProbe")
-
// Inline mutable state since not many join operations in a task
val relationTerm = ctx.addMutableState(clsName, "relation",
v => s"""
| $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
| incPeakExecutionMemory($v.estimatedSize());
- | ${genTaskListener(avgHashProbe, v)}
""".stripMargin, forceInline = true)
(broadcastRelation, relationTerm)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index dab873b..1aef5f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.joins
-import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
@@ -194,8 +193,7 @@ trait HashJoin {
protected def join(
streamedIter: Iterator[InternalRow],
hashed: HashedRelation,
- numOutputRows: SQLMetric,
- avgHashProbe: SQLMetric): Iterator[InternalRow] = {
+ numOutputRows: SQLMetric): Iterator[InternalRow] = {
val joinedIter = joinType match {
case _: InnerLike =>
@@ -213,10 +211,6 @@ trait HashJoin {
s"BroadcastHashJoin should not take $x as the JoinType")
}
- // At the end of the task, we update the avg hash probe.
- TaskContext.get().addTaskCompletionListener[Unit](_ =>
- avgHashProbe.set(hashed.getAverageProbesPerLookup))
-
val resultProj = createResultProjection
joinedIter.map { r =>
numOutputRows += 1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index b1ff6e8..7c21062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -80,11 +80,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation {
* Release any used resources.
*/
def close(): Unit
-
- /**
- * Returns the average number of probes per key lookup.
- */
- def getAverageProbesPerLookup: Double
}
private[execution] object HashedRelation {
@@ -279,8 +274,6 @@ private[joins] class UnsafeHashedRelation(
override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
read(() => in.readInt(), () => in.readLong(), in.readBytes)
}
-
- override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
}
private[joins] object UnsafeHashedRelation {
@@ -395,10 +388,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
// The number of unique keys.
private var numKeys = 0L
- // Tracking average number of probes per key lookup.
- private var numKeyLookups = 0L
- private var numProbes = 0L
-
// needed by serializer
def this() = {
this(
@@ -483,8 +472,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
*/
def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
if (isDense) {
- numKeyLookups += 1
- numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
@@ -493,14 +480,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
} else {
var pos = firstSlot(key)
- numKeyLookups += 1
- numProbes += 1
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return getRow(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
- numProbes += 1
}
}
null
@@ -528,8 +512,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
*/
def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
if (isDense) {
- numKeyLookups += 1
- numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
@@ -538,14 +520,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
}
} else {
var pos = firstSlot(key)
- numKeyLookups += 1
- numProbes += 1
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return valueIter(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
- numProbes += 1
}
}
null
@@ -585,11 +564,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
private def updateIndex(key: Long, address: Long): Unit = {
var pos = firstSlot(key)
assert(numKeys < array.length / 2)
- numKeyLookups += 1
- numProbes += 1
while (array(pos) != key && array(pos + 1) != 0) {
pos = nextSlot(pos)
- numProbes += 1
}
if (array(pos + 1) == 0) {
// this is the first value for this key, put the address in array.
@@ -721,8 +697,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
writeLong(maxKey)
writeLong(numKeys)
writeLong(numValues)
- writeLong(numKeyLookups)
- writeLong(numProbes)
writeLong(array.length)
writeLongArray(writeBuffer, array, array.length)
@@ -764,8 +738,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
maxKey = readLong()
numKeys = readLong()
numValues = readLong()
- numKeyLookups = readLong()
- numProbes = readLong()
val length = readLong().toInt
mask = length - 2
@@ -783,11 +755,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap
override def read(kryo: Kryo, in: Input): Unit = {
read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
}
-
- /**
- * Returns the average number of probes per key lookup.
- */
- def getAverageProbesPerLookup: Double = numProbes.toDouble / numKeyLookups
}
private[joins] class LongHashedRelation(
@@ -839,8 +806,6 @@ private[joins] class LongHashedRelation(
resultRow = new UnsafeRow(nFields)
map = in.readObject().asInstanceOf[LongToUnsafeRowMap]
}
-
- override def getAverageProbesPerLookup: Double = map.getAverageProbesPerLookup
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 2b59ed6..524804d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -42,8 +42,7 @@ case class ShuffledHashJoinExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
- "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"),
- "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
+ "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
override def requiredChildDistribution: Seq[Distribution] =
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
@@ -63,10 +62,9 @@ case class ShuffledHashJoinExec(
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
- val avgHashProbe = longMetric("avgHashProbe")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
- join(streamIter, hashed, numOutputRows, avgHashProbe)
+ join(streamIter, hashed, numOutputRows)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4a80638..f649549 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -261,50 +261,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
)
}
- test("BroadcastHashJoin metrics: track avg probe") {
- // The executed plan looks like:
- // Project [a#210, b#211, b#221]
- // +- BroadcastHashJoin [a#210], [a#220], Inner, BuildRight
- // :- Project [_1#207 AS a#210, _2#208 AS b#211]
- // : +- Filter isnotnull(_1#207)
- // : +- LocalTableScan [_1#207, _2#208]
- // +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, binary, true]))
- // +- Project [_1#217 AS a#220, _2#218 AS b#221]
- // +- Filter isnotnull(_1#217)
- // +- LocalTableScan [_1#217, _2#218]
- //
- // Assume the execution plan with node id is
- // WholeStageCodegen disabled:
- // Project(nodeId = 0)
- // BroadcastHashJoin(nodeId = 1)
- // ...(ignored)
- //
- // WholeStageCodegen enabled:
- // WholeStageCodegen(nodeId = 0)
- // Project(nodeId = 1)
- // BroadcastHashJoin(nodeId = 2)
- // Project(nodeId = 3)
- // Filter(nodeId = 4)
- // ...(ignored)
- Seq(true, false).foreach { enableWholeStage =>
- val df1 = generateRandomBytesDF()
- val df2 = generateRandomBytesDF()
- val df = df1.join(broadcast(df2), "a")
- val nodeIds = if (enableWholeStage) {
- Set(2L)
- } else {
- Set(1L)
- }
- val metrics = getSparkPlanMetrics(df, 2, nodeIds, enableWholeStage).get
- nodeIds.foreach { nodeId =>
- val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
- probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
- assert(probe.toDouble > 1.0)
- }
- }
- }
- }
-
test("ShuffledHashJoin metrics") {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40",
"spark.sql.shuffle.partitions" -> "2",
@@ -323,8 +279,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
1L -> (("ShuffledHashJoin", Map(
- "number of output rows" -> 2L,
- "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
+ "number of output rows" -> 2L))),
2L -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
@@ -335,53 +290,6 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}
}
- test("ShuffledHashJoin metrics: track avg probe") {
- // The executed plan looks like:
- // Project [a#308, b#309, b#319]
- // +- ShuffledHashJoin [a#308], [a#318], Inner, BuildRight
- // :- Exchange hashpartitioning(a#308, 2)
- // : +- Project [_1#305 AS a#308, _2#306 AS b#309]
- // : +- Filter isnotnull(_1#305)
- // : +- LocalTableScan [_1#305, _2#306]
- // +- Exchange hashpartitioning(a#318, 2)
- // +- Project [_1#315 AS a#318, _2#316 AS b#319]
- // +- Filter isnotnull(_1#315)
- // +- LocalTableScan [_1#315, _2#316]
- //
- // Assume the execution plan with node id is
- // WholeStageCodegen disabled:
- // Project(nodeId = 0)
- // ShuffledHashJoin(nodeId = 1)
- // ...(ignored)
- //
- // WholeStageCodegen enabled:
- // WholeStageCodegen(nodeId = 0)
- // Project(nodeId = 1)
- // ShuffledHashJoin(nodeId = 2)
- // ...(ignored)
- withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "5000000",
- "spark.sql.shuffle.partitions" -> "2",
- "spark.sql.join.preferSortMergeJoin" -> "false") {
- Seq(true, false).foreach { enableWholeStage =>
- val df1 = generateRandomBytesDF(65535 * 5)
- val df2 = generateRandomBytesDF(65535)
- val df = df1.join(df2, "a")
- val nodeIds = if (enableWholeStage) {
- Set(2L)
- } else {
- Set(1L)
- }
- val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
- nodeIds.foreach { nodeId =>
- val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
- probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
- assert(probe.toDouble > 1.0)
- }
- }
- }
- }
- }
-
test("BroadcastHashJoin(outer) metrics") {
val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org