You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/10/12 15:00:26 UTC

spark git commit: [SPARK-22251][SQL] Metric 'aggregate time' is incorrect when codegen is off

Repository: spark
Updated Branches:
  refs/heads/master 73d80ec49 -> 02218c4c7


[SPARK-22251][SQL] Metric 'aggregate time' is incorrect when codegen is off

## What changes were proposed in this pull request?

Adding the code for setting 'aggregate time' metric to non-codegen path in HashAggregateExec and to ObjectHashAggregateExces.

## How was this patch tested?

Tested manually.

Author: Ala Luszczak <al...@databricks.com>

Closes #19473 from ala/fix-agg-time.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02218c4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02218c4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02218c4c

Branch: refs/heads/master
Commit: 02218c4c73c32741390d9906b6190ef2124ce518
Parents: 73d80ec
Author: Ala Luszczak <al...@databricks.com>
Authored: Thu Oct 12 17:00:22 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Oct 12 17:00:22 2017 +0200

----------------------------------------------------------------------
 .../spark/sql/execution/aggregate/HashAggregateExec.scala   | 6 +++++-
 .../sql/execution/aggregate/ObjectHashAggregateExec.scala   | 9 +++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/02218c4c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 8b573fd..43e5ff8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -95,11 +95,13 @@ case class HashAggregateExec(
     val peakMemory = longMetric("peakMemory")
     val spillSize = longMetric("spillSize")
     val avgHashProbe = longMetric("avgHashProbe")
+    val aggTime = longMetric("aggTime")
 
     child.execute().mapPartitionsWithIndex { (partIndex, iter) =>
 
+      val beforeAgg = System.nanoTime()
       val hasInput = iter.hasNext
-      if (!hasInput && groupingExpressions.nonEmpty) {
+      val res = if (!hasInput && groupingExpressions.nonEmpty) {
         // This is a grouped aggregate and the input iterator is empty,
         // so return an empty iterator.
         Iterator.empty
@@ -128,6 +130,8 @@ case class HashAggregateExec(
           aggregationIterator
         }
       }
+      aggTime += (System.nanoTime() - beforeAgg) / 1000000
+      res
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/02218c4c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index 6316e06..ec3f9a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -76,7 +76,8 @@ case class ObjectHashAggregateExec(
       aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")
   )
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -96,11 +97,13 @@ case class ObjectHashAggregateExec(
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     val numOutputRows = longMetric("numOutputRows")
+    val aggTime = longMetric("aggTime")
     val fallbackCountThreshold = sqlContext.conf.objectAggSortBasedFallbackThreshold
 
     child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) =>
+      val beforeAgg = System.nanoTime()
       val hasInput = iter.hasNext
-      if (!hasInput && groupingExpressions.nonEmpty) {
+      val res = if (!hasInput && groupingExpressions.nonEmpty) {
         // This is a grouped aggregate and the input kvIterator is empty,
         // so return an empty kvIterator.
         Iterator.empty
@@ -127,6 +130,8 @@ case class ObjectHashAggregateExec(
           aggregationIterator
         }
       }
+      aggTime += (System.nanoTime() - beforeAgg) / 1000000
+      res
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org