You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/12 03:00:05 UTC
spark git commit: [SPARK-12915][SQL] add SQL metrics of numOutputRows
for whole stage codegen
Repository: spark
Updated Branches:
refs/heads/master a5257048d -> b10af5e23
[SPARK-12915][SQL] add SQL metrics of numOutputRows for whole stage codegen
This PR add SQL metrics (numOutputRows) for generated operators (same as non-generated), the cost is about 0.2 nano seconds per row.
<img width="806" alt="gen metrics" src="https://cloud.githubusercontent.com/assets/40902/12994694/47f5881e-d0d7-11e5-9d47-78229f559ab0.png">
Author: Davies Liu <da...@databricks.com>
Closes #11170 from davies/gen_metric.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b10af5e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b10af5e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b10af5e2
Branch: refs/heads/master
Commit: b10af5e238ce2051be2bf4d7ddda181d34cbb69a
Parents: a525704
Author: Davies Liu <da...@databricks.com>
Authored: Thu Feb 11 18:00:03 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Feb 11 18:00:03 2016 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/sql/DataFrame.scala | 2 +-
.../apache/spark/sql/execution/SparkPlan.scala | 7 ++++
.../spark/sql/execution/WholeStageCodegen.scala | 19 ++++++++++-
.../execution/aggregate/TungstenAggregate.scala | 5 +++
.../spark/sql/execution/basicOperators.scala | 8 ++++-
.../sql/execution/joins/BroadcastHashJoin.scala | 7 +++-
.../execution/BenchmarkWholeStageCodegen.scala | 2 +-
.../sql/execution/metric/SQLMetricsSuite.scala | 34 +++++++++-----------
.../spark/sql/util/DataFrameCallbackSuite.scala | 18 ++++++-----
9 files changed, 71 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 7aa08fb..c5b2b7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1775,7 +1775,7 @@ class DataFrame private[sql](
private def withCallback[T](name: String, df: DataFrame)(action: DataFrame => T) = {
try {
df.queryExecution.executedPlan.foreach { plan =>
- plan.metrics.valuesIterator.foreach(_.reset())
+ plan.resetMetrics()
}
val start = System.nanoTime()
val result = action(df)
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 3cc99d3..c72b8dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -78,6 +78,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
/**
+ * Reset all the metrics.
+ */
+ private[sql] def resetMetrics(): Unit = {
+ metrics.valuesIterator.foreach(_.reset())
+ }
+
+ /**
* Return a LongSQLMetric according to the name.
*/
private[sql] def longMetric(name: String): LongSQLMetric =
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 30f74fc..f35efb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, BuildLeft, BuildRight}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, LongSQLMetricValue, SQLMetric}
/**
* An interface for those physical operators that support codegen.
@@ -43,6 +43,19 @@ trait CodegenSupport extends SparkPlan {
}
/**
+ * Creates a metric using the specified name.
+ *
+ * @return name of the variable representing the metric
+ */
+ def metricTerm(ctx: CodegenContext, name: String): String = {
+ val metric = ctx.addReferenceObj(name, longMetric(name))
+ val value = ctx.freshName("metricValue")
+ val cls = classOf[LongSQLMetricValue].getName
+ ctx.addMutableState(cls, value, s"$value = ($cls) $metric.localValue();")
+ value
+ }
+
+ /**
* Whether this SparkPlan support whole stage codegen or not.
*/
def supportCodegen: Boolean = true
@@ -316,6 +329,10 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
}
}
+ private[sql] override def resetMetrics(): Unit = {
+ plan.foreach(_.resetMetrics())
+ }
+
override def generateTreeString(
depth: Int,
lastChildren: Seq[Boolean],
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index a6950f8..852203f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -202,6 +202,7 @@ case class TungstenAggregate(
| }
""".stripMargin)
+ val numOutput = metricTerm(ctx, "numOutputRows")
s"""
| if (!$initAgg) {
| $initAgg = true;
@@ -210,6 +211,7 @@ case class TungstenAggregate(
| // output the result
| ${genResult.trim}
|
+ | $numOutput.add(1);
| ${consume(ctx, resultVars).trim}
| }
""".stripMargin
@@ -297,6 +299,7 @@ case class TungstenAggregate(
val peakMemory = Math.max(mapMemory, sorterMemory)
val metrics = TaskContext.get().taskMetrics()
metrics.incPeakExecutionMemory(peakMemory)
+ // TODO: update data size and spill size
if (sorter == null) {
// not spilled
@@ -456,6 +459,7 @@ case class TungstenAggregate(
val keyTerm = ctx.freshName("aggKey")
val bufferTerm = ctx.freshName("aggBuffer")
val outputCode = generateResultCode(ctx, keyTerm, bufferTerm, thisPlan)
+ val numOutput = metricTerm(ctx, "numOutputRows")
s"""
if (!$initAgg) {
@@ -465,6 +469,7 @@ case class TungstenAggregate(
// output the result
while ($iterTerm.next()) {
+ $numOutput.add(1);
UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
$outputCode
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 949acb9..4b82d55 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.types.LongType
import org.apache.spark.util.random.PoissonSampler
@@ -78,6 +78,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+ val numOutput = metricTerm(ctx, "numOutputRows")
val expr = ExpressionCanonicalizer.execute(
BindReferences.bindReference(condition, child.output))
ctx.currentVars = input
@@ -90,6 +91,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
s"""
| ${eval.code}
| if ($nullCheck ${eval.value}) {
+ | $numOutput.add(1);
| ${consume(ctx, ctx.currentVars)}
| }
""".stripMargin
@@ -159,6 +161,8 @@ case class Range(
}
protected override def doProduce(ctx: CodegenContext): String = {
+ val numOutput = metricTerm(ctx, "numOutputRows")
+
val initTerm = ctx.freshName("initRange")
ctx.addMutableState("boolean", initTerm, s"$initTerm = false;")
val partitionEnd = ctx.freshName("partitionEnd")
@@ -204,6 +208,8 @@ case class Range(
| } else {
| $partitionEnd = end.longValue();
| }
+ |
+ | $numOutput.add(($partitionEnd - $number) / ${step}L);
| }
""".stripMargin)
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 35c7963..985e740 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -163,6 +163,7 @@ case class BroadcastHashJoin(
case BuildRight => input ++ buildColumns
}
+ val numOutput = metricTerm(ctx, "numOutputRows")
val outputCode = if (condition.isDefined) {
// filter the output via condition
ctx.currentVars = resultVars
@@ -170,11 +171,15 @@ case class BroadcastHashJoin(
s"""
| ${ev.code}
| if (!${ev.isNull} && ${ev.value}) {
+ | $numOutput.add(1);
| ${consume(ctx, resultVars)}
| }
""".stripMargin
} else {
- consume(ctx, resultVars)
+ s"""
+ |$numOutput.add(1);
+ |${consume(ctx, resultVars)}
+ """.stripMargin
}
if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index dc6c647..1c7e69f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -63,7 +63,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X
- rang/filter/sum codegen=true 845 / 940 620.0 1.6 17.0X
+ rang/filter/sum codegen=true 897 / 1022 584.6 1.7 16.4X
*/
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d24625a..f4bc9e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -298,24 +298,22 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
test("save metrics") {
withTempPath { file =>
- withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
- val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
- // Assume the execution plan is
- // PhysicalRDD(nodeId = 0)
- person.select('name).write.format("json").save(file.getAbsolutePath)
- sparkContext.listenerBus.waitUntilEmpty(10000)
- val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
- assert(executionIds.size === 1)
- val executionId = executionIds.head
- val jobs = sqlContext.listener.getExecution(executionId).get.jobs
- // Use "<=" because there is a race condition that we may miss some jobs
- // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
- assert(jobs.size <= 1)
- val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
- // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
- // However, we still can check the value.
- assert(metricValues.values.toSeq === Seq("2"))
- }
+ val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
+ // Assume the execution plan is
+ // PhysicalRDD(nodeId = 0)
+ person.select('name).write.format("json").save(file.getAbsolutePath)
+ sparkContext.listenerBus.waitUntilEmpty(10000)
+ val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ assert(executionIds.size === 1)
+ val executionId = executionIds.head
+ val jobs = sqlContext.listener.getExecution(executionId).get.jobs
+ // Use "<=" because there is a race condition that we may miss some jobs
+ // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
+ assert(jobs.size <= 1)
+ val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
+ // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
+ // However, we still can check the value.
+ assert(metricValues.values.toSeq === Seq("2"))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index d3191d3..15a9562 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.sql.{functions, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegen}
import org.apache.spark.sql.test.SharedSQLContext
class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
@@ -92,17 +92,19 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
- metrics += qe.executedPlan.longMetric("numOutputRows").value.value
+ val metric = qe.executedPlan match {
+ case w: WholeStageCodegen => w.plan.longMetric("numOutputRows")
+ case other => other.longMetric("numOutputRows")
+ }
+ metrics += metric.value.value
}
}
sqlContext.listenerManager.register(listener)
- withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
- val df = Seq(1 -> "a").toDF("i", "j").groupBy("i").count()
- df.collect()
- df.collect()
- Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect()
- }
+ val df = Seq(1 -> "a").toDF("i", "j").groupBy("i").count()
+ df.collect()
+ df.collect()
+ Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect()
assert(metrics.length == 3)
assert(metrics(0) === 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org