You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/12 03:00:05 UTC

spark git commit: [SPARK-12915][SQL] add SQL metrics of numOutputRows for whole stage codegen

Repository: spark
Updated Branches:
  refs/heads/master a5257048d -> b10af5e23


[SPARK-12915][SQL] add SQL metrics of numOutputRows for whole stage codegen

This PR add SQL metrics (numOutputRows) for generated operators (same as non-generated), the cost is about 0.2 nano seconds per row.

<img width="806" alt="gen metrics" src="https://cloud.githubusercontent.com/assets/40902/12994694/47f5881e-d0d7-11e5-9d47-78229f559ab0.png">

Author: Davies Liu <da...@databricks.com>

Closes #11170 from davies/gen_metric.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b10af5e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b10af5e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b10af5e2

Branch: refs/heads/master
Commit: b10af5e238ce2051be2bf4d7ddda181d34cbb69a
Parents: a525704
Author: Davies Liu <da...@databricks.com>
Authored: Thu Feb 11 18:00:03 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Feb 11 18:00:03 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  |  2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  7 ++++
 .../spark/sql/execution/WholeStageCodegen.scala | 19 ++++++++++-
 .../execution/aggregate/TungstenAggregate.scala |  5 +++
 .../spark/sql/execution/basicOperators.scala    |  8 ++++-
 .../sql/execution/joins/BroadcastHashJoin.scala |  7 +++-
 .../execution/BenchmarkWholeStageCodegen.scala  |  2 +-
 .../sql/execution/metric/SQLMetricsSuite.scala  | 34 +++++++++-----------
 .../spark/sql/util/DataFrameCallbackSuite.scala | 18 ++++++-----
 9 files changed, 71 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 7aa08fb..c5b2b7d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1775,7 +1775,7 @@ class DataFrame private[sql](
   private def withCallback[T](name: String, df: DataFrame)(action: DataFrame => T) = {
     try {
       df.queryExecution.executedPlan.foreach { plan =>
-        plan.metrics.valuesIterator.foreach(_.reset())
+        plan.resetMetrics()
       }
       val start = System.nanoTime()
       val result = action(df)

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 3cc99d3..c72b8dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -78,6 +78,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
 
   /**
+    * Reset all the metrics.
+    */
+  private[sql] def resetMetrics(): Unit = {
+    metrics.valuesIterator.foreach(_.reset())
+  }
+
+  /**
    * Return a LongSQLMetric according to the name.
    */
   private[sql] def longMetric(name: String): LongSQLMetric =

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 30f74fc..f35efb5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.aggregate.TungstenAggregate
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, BuildLeft, BuildRight}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, LongSQLMetricValue, SQLMetric}
 
 /**
   * An interface for those physical operators that support codegen.
@@ -43,6 +43,19 @@ trait CodegenSupport extends SparkPlan {
   }
 
   /**
+    * Creates a metric using the specified name.
+    *
+    * @return name of the variable representing the metric
+    */
+  def metricTerm(ctx: CodegenContext, name: String): String = {
+    val metric = ctx.addReferenceObj(name, longMetric(name))
+    val value = ctx.freshName("metricValue")
+    val cls = classOf[LongSQLMetricValue].getName
+    ctx.addMutableState(cls, value, s"$value = ($cls) $metric.localValue();")
+    value
+  }
+
+  /**
     * Whether this SparkPlan support whole stage codegen or not.
     */
   def supportCodegen: Boolean = true
@@ -316,6 +329,10 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
     }
   }
 
+  private[sql] override def resetMetrics(): Unit = {
+    plan.foreach(_.resetMetrics())
+  }
+
   override def generateTreeString(
       depth: Int,
       lastChildren: Seq[Boolean],

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index a6950f8..852203f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -202,6 +202,7 @@ case class TungstenAggregate(
          | }
        """.stripMargin)
 
+    val numOutput = metricTerm(ctx, "numOutputRows")
     s"""
        | if (!$initAgg) {
        |   $initAgg = true;
@@ -210,6 +211,7 @@ case class TungstenAggregate(
        |   // output the result
        |   ${genResult.trim}
        |
+       |   $numOutput.add(1);
        |   ${consume(ctx, resultVars).trim}
        | }
      """.stripMargin
@@ -297,6 +299,7 @@ case class TungstenAggregate(
     val peakMemory = Math.max(mapMemory, sorterMemory)
     val metrics = TaskContext.get().taskMetrics()
     metrics.incPeakExecutionMemory(peakMemory)
+    // TODO: update data size and spill size
 
     if (sorter == null) {
       // not spilled
@@ -456,6 +459,7 @@ case class TungstenAggregate(
     val keyTerm = ctx.freshName("aggKey")
     val bufferTerm = ctx.freshName("aggBuffer")
     val outputCode = generateResultCode(ctx, keyTerm, bufferTerm, thisPlan)
+    val numOutput = metricTerm(ctx, "numOutputRows")
 
     s"""
      if (!$initAgg) {
@@ -465,6 +469,7 @@ case class TungstenAggregate(
 
      // output the result
      while ($iterTerm.next()) {
+       $numOutput.add(1);
        UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
        UnsafeRow $bufferTerm = (UnsafeRow) $iterTerm.getValue();
        $outputCode

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 949acb9..4b82d55 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer}
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
 import org.apache.spark.sql.types.LongType
 import org.apache.spark.util.random.PoissonSampler
 
@@ -78,6 +78,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+    val numOutput = metricTerm(ctx, "numOutputRows")
     val expr = ExpressionCanonicalizer.execute(
       BindReferences.bindReference(condition, child.output))
     ctx.currentVars = input
@@ -90,6 +91,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
     s"""
        | ${eval.code}
        | if ($nullCheck ${eval.value}) {
+       |   $numOutput.add(1);
        |   ${consume(ctx, ctx.currentVars)}
        | }
      """.stripMargin
@@ -159,6 +161,8 @@ case class Range(
   }
 
   protected override def doProduce(ctx: CodegenContext): String = {
+    val numOutput = metricTerm(ctx, "numOutputRows")
+
     val initTerm = ctx.freshName("initRange")
     ctx.addMutableState("boolean", initTerm, s"$initTerm = false;")
     val partitionEnd = ctx.freshName("partitionEnd")
@@ -204,6 +208,8 @@ case class Range(
         |   } else {
         |     $partitionEnd = end.longValue();
         |   }
+        |
+        |   $numOutput.add(($partitionEnd - $number) / ${step}L);
         | }
        """.stripMargin)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 35c7963..985e740 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -163,6 +163,7 @@ case class BroadcastHashJoin(
       case BuildRight => input ++ buildColumns
     }
 
+    val numOutput = metricTerm(ctx, "numOutputRows")
     val outputCode = if (condition.isDefined) {
       // filter the output via condition
       ctx.currentVars = resultVars
@@ -170,11 +171,15 @@ case class BroadcastHashJoin(
       s"""
          | ${ev.code}
          | if (!${ev.isNull} && ${ev.value}) {
+         |   $numOutput.add(1);
          |   ${consume(ctx, resultVars)}
          | }
        """.stripMargin
     } else {
-      consume(ctx, resultVars)
+      s"""
+         |$numOutput.add(1);
+         |${consume(ctx, resultVars)}
+       """.stripMargin
     }
 
     if (broadcastRelation.value.isInstanceOf[UniqueHashedRelation]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index dc6c647..1c7e69f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -63,7 +63,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     rang/filter/sum:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
     -------------------------------------------------------------------------------------------
     rang/filter/sum codegen=false          14332 / 16646         36.0          27.8       1.0X
-    rang/filter/sum codegen=true              845 /  940        620.0           1.6      17.0X
+    rang/filter/sum codegen=true              897 / 1022        584.6           1.7      16.4X
     */
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index d24625a..f4bc9e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -298,24 +298,22 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
 
   test("save metrics") {
     withTempPath { file =>
-      withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
-        val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
-        // Assume the execution plan is
-        // PhysicalRDD(nodeId = 0)
-        person.select('name).write.format("json").save(file.getAbsolutePath)
-        sparkContext.listenerBus.waitUntilEmpty(10000)
-        val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
-        assert(executionIds.size === 1)
-        val executionId = executionIds.head
-        val jobs = sqlContext.listener.getExecution(executionId).get.jobs
-        // Use "<=" because there is a race condition that we may miss some jobs
-        // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
-        assert(jobs.size <= 1)
-        val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
-        // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
-        // However, we still can check the value.
-        assert(metricValues.values.toSeq === Seq("2"))
-      }
+      val previousExecutionIds = sqlContext.listener.executionIdToData.keySet
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 0)
+      person.select('name).write.format("json").save(file.getAbsolutePath)
+      sparkContext.listenerBus.waitUntilEmpty(10000)
+      val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+      assert(executionIds.size === 1)
+      val executionId = executionIds.head
+      val jobs = sqlContext.listener.getExecution(executionId).get.jobs
+      // Use "<=" because there is a race condition that we may miss some jobs
+      // TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
+      assert(jobs.size <= 1)
+      val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
+      // Because "save" will create a new DataFrame internally, we cannot get the real metric id.
+      // However, we still can check the value.
+      assert(metricValues.values.toSeq === Seq("2"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b10af5e2/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index d3191d3..15a9562 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark._
 import org.apache.spark.sql.{functions, QueryTest}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project}
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegen}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
@@ -92,17 +92,19 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
       override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
 
       override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
-        metrics += qe.executedPlan.longMetric("numOutputRows").value.value
+        val metric = qe.executedPlan match {
+          case w: WholeStageCodegen => w.plan.longMetric("numOutputRows")
+          case other => other.longMetric("numOutputRows")
+        }
+        metrics += metric.value.value
       }
     }
     sqlContext.listenerManager.register(listener)
 
-    withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
-      val df = Seq(1 -> "a").toDF("i", "j").groupBy("i").count()
-      df.collect()
-      df.collect()
-      Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect()
-    }
+    val df = Seq(1 -> "a").toDF("i", "j").groupBy("i").count()
+    df.collect()
+    df.collect()
+    Seq(1 -> "a", 2 -> "a").toDF("i", "j").groupBy("i").count().collect()
 
     assert(metrics.length == 3)
     assert(metrics(0) === 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org