You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/10/12 15:00:26 UTC
spark git commit: [SPARK-22251][SQL] Metric 'aggregate time' is
incorrect when codegen is off
Repository: spark
Updated Branches:
refs/heads/master 73d80ec49 -> 02218c4c7
[SPARK-22251][SQL] Metric 'aggregate time' is incorrect when codegen is off
## What changes were proposed in this pull request?
Adding the code for setting 'aggregate time' metric to non-codegen path in HashAggregateExec and to ObjectHashAggregateExces.
## How was this patch tested?
Tested manually.
Author: Ala Luszczak <al...@databricks.com>
Closes #19473 from ala/fix-agg-time.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02218c4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02218c4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02218c4c
Branch: refs/heads/master
Commit: 02218c4c73c32741390d9906b6190ef2124ce518
Parents: 73d80ec
Author: Ala Luszczak <al...@databricks.com>
Authored: Thu Oct 12 17:00:22 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Thu Oct 12 17:00:22 2017 +0200
----------------------------------------------------------------------
.../spark/sql/execution/aggregate/HashAggregateExec.scala | 6 +++++-
.../sql/execution/aggregate/ObjectHashAggregateExec.scala | 9 +++++++--
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/02218c4c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 8b573fd..43e5ff8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -95,11 +95,13 @@ case class HashAggregateExec(
val peakMemory = longMetric("peakMemory")
val spillSize = longMetric("spillSize")
val avgHashProbe = longMetric("avgHashProbe")
+ val aggTime = longMetric("aggTime")
child.execute().mapPartitionsWithIndex { (partIndex, iter) =>
+ val beforeAgg = System.nanoTime()
val hasInput = iter.hasNext
- if (!hasInput && groupingExpressions.nonEmpty) {
+ val res = if (!hasInput && groupingExpressions.nonEmpty) {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
@@ -128,6 +130,8 @@ case class HashAggregateExec(
aggregationIterator
}
}
+ aggTime += (System.nanoTime() - beforeAgg) / 1000000
+ res
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/02218c4c/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index 6316e06..ec3f9a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -76,7 +76,8 @@ case class ObjectHashAggregateExec(
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
+ "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")
)
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -96,11 +97,13 @@ case class ObjectHashAggregateExec(
protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numOutputRows = longMetric("numOutputRows")
+ val aggTime = longMetric("aggTime")
val fallbackCountThreshold = sqlContext.conf.objectAggSortBasedFallbackThreshold
child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) =>
+ val beforeAgg = System.nanoTime()
val hasInput = iter.hasNext
- if (!hasInput && groupingExpressions.nonEmpty) {
+ val res = if (!hasInput && groupingExpressions.nonEmpty) {
// This is a grouped aggregate and the input kvIterator is empty,
// so return an empty kvIterator.
Iterator.empty
@@ -127,6 +130,8 @@ case class ObjectHashAggregateExec(
aggregationIterator
}
}
+ aggTime += (System.nanoTime() - beforeAgg) / 1000000
+ res
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org