You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/22 21:59:36 UTC

spark git commit: [SPARK-14669] [SQL] Fix some SQL metrics in codegen and added more

Repository: spark
Updated Branches:
  refs/heads/master 0419d6316 -> 0dcf9dbeb


[SPARK-14669] [SQL] Fix some SQL metrics in codegen and added more

## What changes were proposed in this pull request?

1. Fix the "spill size" of TungstenAggregate and Sort
2. Rename "data size" to "peak memory" to match the actual meaning (also consistent with task metrics)
3. Added "data size" for ShuffleExchange and BroadcastExchange
4. Added some timing for Sort, Aggregate and BroadcastExchange (this requires another patch to work)

## How was this patch tested?

Existing tests.
![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png)

Author: Davies Liu <da...@databricks.com>

Closes #12425 from davies/fix_metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0dcf9dbe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0dcf9dbe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0dcf9dbe

Branch: refs/heads/master
Commit: 0dcf9dbebbd53aaebe17c85ede7ab7847ce83137
Parents: 0419d63
Author: Davies Liu <da...@databricks.com>
Authored: Fri Apr 22 12:59:32 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Apr 22 12:59:32 2016 -0700

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       | 12 ++++++-
 .../sql/execution/UnsafeKVExternalSorter.java   |  7 ++++
 .../org/apache/spark/sql/execution/Sort.scala   | 19 ++++++-----
 .../sql/execution/UnsafeRowSerializer.scala     | 19 ++++++++---
 .../execution/aggregate/TungstenAggregate.scala | 34 ++++++++++++++------
 .../aggregate/TungstenAggregationIterator.scala |  8 ++---
 .../execution/exchange/BroadcastExchange.scala  | 20 +++++++++++-
 .../execution/exchange/ShuffleExchange.scala    |  9 ++++--
 .../sql/execution/joins/ShuffledHashJoin.scala  |  9 +++++-
 .../spark/sql/execution/metric/SQLMetrics.scala |  5 ++-
 10 files changed, 110 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 66a7798..3c1cd39 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -75,6 +75,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
   private MemoryBlock currentPage = null;
   private long pageCursor = -1;
   private long peakMemoryUsedBytes = 0;
+  private long totalSpillBytes = 0L;
   private volatile SpillableIterator readingIterator = null;
 
   public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -215,7 +216,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     // we might not be able to get memory for the pointer array.
 
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
-
+    totalSpillBytes += spillSize;
     return spillSize;
   }
 
@@ -246,6 +247,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     return peakMemoryUsedBytes;
   }
 
+  /**
+   * Return the total number of bytes that has been spilled into disk so far.
+   */
+  public long getSpillSize() {
+    return totalSpillBytes;
+  }
+
   @VisibleForTesting
   public int getNumberOfAllocatedPages() {
     return allocatedPages.size();
@@ -499,6 +507,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         released += inMemSorter.getMemoryUsage();
         inMemSorter.free();
         inMemSorter = null;
+        taskContext.taskMetrics().incMemoryBytesSpilled(released);
+        totalSpillBytes += released;
         return released;
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index b6499e3..38dbfef 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -177,6 +177,13 @@ public final class UnsafeKVExternalSorter {
   }
 
   /**
+   * Return the total number of bytes that has been spilled into disk so far.
+   */
+  public long getSpillSize() {
+    return sorter.getSpillSize();
+  }
+
+  /**
    * Return the peak memory used so far, in bytes.
    */
   public long getPeakMemoryUsedBytes() {

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 6f9baa2..04a39a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -53,8 +53,8 @@ case class Sort(
   private val enableRadixSort = sqlContext.conf.enableRadixSort
 
   override private[sql] lazy val metrics = Map(
-    "sortTime" -> SQLMetrics.createLongMetric(sparkContext, "sort time"),
-    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+    "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"),
+    "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
 
   def createSorter(): UnsafeExternalRowSorter = {
@@ -86,8 +86,9 @@ case class Sort(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val dataSize = longMetric("dataSize")
+    val peakMemory = longMetric("peakMemory")
     val spillSize = longMetric("spillSize")
+    val sortTime = longMetric("sortTime")
 
     child.execute().mapPartitionsInternal { iter =>
       val sorter = createSorter()
@@ -96,10 +97,12 @@ case class Sort(
       // Remember spill data size of this task before execute this operator so that we can
       // figure out how many bytes we spilled for this operator.
       val spillSizeBefore = metrics.memoryBytesSpilled
+      val beforeSort = System.nanoTime()
 
       val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
 
-      dataSize += sorter.getPeakMemoryUsage
+      sortTime += (System.nanoTime() - beforeSort) / 1000000
+      peakMemory += sorter.getPeakMemoryUsage
       spillSize += metrics.memoryBytesSpilled - spillSizeBefore
       metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
 
@@ -145,19 +148,19 @@ case class Sort(
     ctx.copyResult = false
 
     val outputRow = ctx.freshName("outputRow")
-    val dataSize = metricTerm(ctx, "dataSize")
+    val peakMemory = metricTerm(ctx, "peakMemory")
     val spillSize = metricTerm(ctx, "spillSize")
     val spillSizeBefore = ctx.freshName("spillSizeBefore")
     val startTime = ctx.freshName("startTime")
     val sortTime = metricTerm(ctx, "sortTime")
     s"""
        | if ($needToSort) {
-       |   $addToSorter();
        |   long $spillSizeBefore = $metrics.memoryBytesSpilled();
        |   long $startTime = System.nanoTime();
+       |   $addToSorter();
        |   $sortedIterator = $sorterVariable.sort();
-       |   $sortTime.add(System.nanoTime() - $startTime);
-       |   $dataSize.add($sorterVariable.getPeakMemoryUsage());
+       |   $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+       |   $peakMemory.add($sorterVariable.getPeakMemoryUsage());
        |   $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
        |   $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());
        |   $needToSort = false;

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index a23ebec..362d0d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -26,6 +26,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, Serializer, SerializerInstance}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.metric.LongSQLMetric
 import org.apache.spark.unsafe.Platform
 
 /**
@@ -39,12 +40,17 @@ import org.apache.spark.unsafe.Platform
  *
  * @param numFields the number of fields in the row being serialized.
  */
-private[sql] class UnsafeRowSerializer(numFields: Int) extends Serializer with Serializable {
-  override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields)
+private[sql] class UnsafeRowSerializer(
+    numFields: Int,
+    dataSize: LongSQLMetric = null) extends Serializer with Serializable {
+  override def newInstance(): SerializerInstance =
+    new UnsafeRowSerializerInstance(numFields, dataSize)
   override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true
 }
 
-private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInstance {
+private class UnsafeRowSerializerInstance(
+    numFields: Int,
+    dataSize: LongSQLMetric) extends SerializerInstance {
   /**
    * Serializes a stream of UnsafeRows. Within the stream, each record consists of a record
    * length (stored as a 4-byte integer, written high byte first), followed by the record's bytes.
@@ -54,9 +60,14 @@ private class UnsafeRowSerializerInstance(numFields: Int) extends SerializerInst
     private[this] val dOut: DataOutputStream =
       new DataOutputStream(new BufferedOutputStream(out))
 
+    // LongSQLMetricParam.add() is faster than LongSQLMetric.+=
+    val localDataSize = if (dataSize != null) dataSize.localValue else null
+
     override def writeValue[T: ClassTag](value: T): SerializationStream = {
       val row = value.asInstanceOf[UnsafeRow]
-
+      if (localDataSize != null) {
+        localDataSize.add(row.getSizeInBytes)
+      }
       dOut.writeInt(row.getSizeInBytes)
       row.writeToStream(dOut, writeBuffer)
       this

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 5c0fc02..49b682a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 import org.apache.spark.unsafe.KVIterator
 
@@ -52,8 +52,9 @@ case class TungstenAggregate(
 
   override private[sql] lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
-    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
-    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
+    "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
+    "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
+    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"))
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
@@ -83,7 +84,7 @@ case class TungstenAggregate(
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
     val numOutputRows = longMetric("numOutputRows")
-    val dataSize = longMetric("dataSize")
+    val peakMemory = longMetric("peakMemory")
     val spillSize = longMetric("spillSize")
 
     child.execute().mapPartitions { iter =>
@@ -107,7 +108,7 @@ case class TungstenAggregate(
             iter,
             testFallbackStartsAt,
             numOutputRows,
-            dataSize,
+            peakMemory,
             spillSize)
         if (!hasInput && groupingExpressions.isEmpty) {
           numOutputRows += 1
@@ -212,10 +213,14 @@ case class TungstenAggregate(
        """.stripMargin)
 
     val numOutput = metricTerm(ctx, "numOutputRows")
+    val aggTime = metricTerm(ctx, "aggTime")
+    val beforeAgg = ctx.freshName("beforeAgg")
     s"""
        | while (!$initAgg) {
        |   $initAgg = true;
+       |   long $beforeAgg = System.nanoTime();
        |   $doAgg();
+       |   $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
        |
        |   // output the result
        |   ${genResult.trim}
@@ -303,15 +308,17 @@ case class TungstenAggregate(
    */
   def finishAggregate(
       hashMap: UnsafeFixedWidthAggregationMap,
-      sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
+      sorter: UnsafeKVExternalSorter,
+      peakMemory: LongSQLMetricValue,
+      spillSize: LongSQLMetricValue): KVIterator[UnsafeRow, UnsafeRow] = {
 
     // update peak execution memory
     val mapMemory = hashMap.getPeakMemoryUsedBytes
     val sorterMemory = Option(sorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
-    val peakMemory = Math.max(mapMemory, sorterMemory)
+    val maxMemory = Math.max(mapMemory, sorterMemory)
     val metrics = TaskContext.get().taskMetrics()
-    metrics.incPeakExecutionMemory(peakMemory)
-    // TODO: update data size and spill size
+    peakMemory.add(maxMemory)
+    metrics.incPeakExecutionMemory(maxMemory)
 
     if (sorter == null) {
       // not spilled
@@ -365,6 +372,7 @@ case class TungstenAggregate(
 
           true
         } else {
+          spillSize.add(sorter.getSpillSize)
           false
         }
       }
@@ -476,6 +484,8 @@ case class TungstenAggregate(
     ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, iterTerm, "")
 
     val doAgg = ctx.freshName("doAggregateWithKeys")
+    val peakMemory = metricTerm(ctx, "peakMemory")
+    val spillSize = metricTerm(ctx, "spillSize")
     ctx.addNewFunction(doAgg,
       s"""
         ${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""}
@@ -486,7 +496,7 @@ case class TungstenAggregate(
           ${if (isVectorizedHashMapEnabled) {
               s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""}
 
-          $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
+          $iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm, $peakMemory, $spillSize);
         }
        """)
 
@@ -528,10 +538,14 @@ case class TungstenAggregate(
       } else None
     }
 
+    val aggTime = metricTerm(ctx, "aggTime")
+    val beforeAgg = ctx.freshName("beforeAgg")
     s"""
      if (!$initAgg) {
        $initAgg = true;
+       long $beforeAgg = System.nanoTime();
        $doAgg();
+       $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
      }
 
      // output the result

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index c368726..9db5087 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -87,7 +87,7 @@ class TungstenAggregationIterator(
     inputIter: Iterator[InternalRow],
     testFallbackStartsAt: Option[(Int, Int)],
     numOutputRows: LongSQLMetric,
-    dataSize: LongSQLMetric,
+    peakMemory: LongSQLMetric,
     spillSize: LongSQLMetric)
   extends AggregationIterator(
     groupingExpressions,
@@ -415,11 +415,11 @@ class TungstenAggregationIterator(
       if (!hasNext) {
         val mapMemory = hashMap.getPeakMemoryUsedBytes
         val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
-        val peakMemory = Math.max(mapMemory, sorterMemory)
+        val maxMemory = Math.max(mapMemory, sorterMemory)
         val metrics = TaskContext.get().taskMetrics()
-        dataSize += peakMemory
+        peakMemory += maxMemory
         spillSize += metrics.memoryBytesSpilled - spillSizeBefore
-        metrics.incPeakExecutionMemory(peakMemory)
+        metrics.incPeakExecutionMemory(maxMemory)
       }
       numOutputRows += 1
       res

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
index a4f4213..87a113e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchange.scala
@@ -23,8 +23,10 @@ import scala.concurrent.duration._
 import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -35,6 +37,12 @@ case class BroadcastExchange(
     mode: BroadcastMode,
     child: SparkPlan) extends Exchange {
 
+  override private[sql] lazy val metrics = Map(
+    "dataSize" -> SQLMetrics.createLongMetric(sparkContext, "data size (bytes)"),
+    "collectTime" -> SQLMetrics.createLongMetric(sparkContext, "time to collect (ms)"),
+    "buildTime" -> SQLMetrics.createLongMetric(sparkContext, "time to build (ms)"),
+    "broadcastTime" -> SQLMetrics.createLongMetric(sparkContext, "time to broadcast (ms)"))
+
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
 
   override def sameResult(plan: SparkPlan): Boolean = plan match {
@@ -61,11 +69,21 @@ case class BroadcastExchange(
       // This will run in another thread. Set the execution id so that we can connect these jobs
       // with the correct execution.
       SQLExecution.withExecutionId(sparkContext, executionId) {
+        val beforeCollect = System.nanoTime()
         // Note that we use .executeCollect() because we don't want to convert data to Scala types
         val input: Array[InternalRow] = child.executeCollect()
+        val beforeBuild = System.nanoTime()
+        longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000
+        longMetric("dataSize") += input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum
 
         // Construct and broadcast the relation.
-        sparkContext.broadcast(mode.transform(input))
+        val relation = mode.transform(input)
+        val beforeBroadcast = System.nanoTime()
+        longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000
+
+        val broadcasted = sparkContext.broadcast(relation)
+        longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000
+        broadcasted
       }
     }(BroadcastExchange.executionContext)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
index d7deac9..e18b59f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala
@@ -25,10 +25,11 @@ import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.util.MutablePair
 
 /**
@@ -39,6 +40,9 @@ case class ShuffleExchange(
     child: SparkPlan,
     @transient coordinator: Option[ExchangeCoordinator]) extends Exchange {
 
+  override private[sql] lazy val metrics = Map(
+    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"))
+
   override def nodeName: String = {
     val extraInfo = coordinator match {
       case Some(exchangeCoordinator) if exchangeCoordinator.isEstimated =>
@@ -54,7 +58,8 @@ case class ShuffleExchange(
 
   override def outputPartitioning: Partitioning = newPartitioning
 
-  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
+  private val serializer: Serializer =
+    new UnsafeRowSerializer(child.output.size, longMetric("dataSize"))
 
   override protected def doPrepare(): Unit = {
     // If an ExchangeCoordinator is needed, we register this Exchange operator

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index f021f37..785373b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -40,7 +40,9 @@ case class ShuffledHashJoin(
   extends BinaryNode with HashJoin {
 
   override private[sql] lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+    "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"),
+    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map"))
 
   override def outputPartitioning: Partitioning = joinType match {
     case Inner => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
@@ -57,8 +59,13 @@ case class ShuffledHashJoin(
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
   private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
+    val buildDataSize = longMetric("buildDataSize")
+    val buildTime = longMetric("buildTime")
+    val start = System.nanoTime()
     val context = TaskContext.get()
     val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
+    buildTime += (System.nanoTime() - start) / 1000000
+    buildDataSize += relation.estimatedSize
     // This relation is usually used until the end of task.
     context.addTaskCompletionListener(_ => relation.close())
     relation

http://git-wip-us.apache.org/repos/asf/spark/blob/0dcf9dbe/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 930adab..5755c00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.metric
 
+import java.text.NumberFormat
+
 import org.apache.spark.{Accumulable, AccumulableParam, Accumulators, SparkContext}
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.util.Utils
@@ -119,7 +121,8 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
   override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
 }
 
-private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
+private object LongSQLMetricParam
+  extends LongSQLMetricParam(x => NumberFormat.getInstance().format(x.sum), 0L)
 
 private object StatisticsBytesSQLMetricParam extends LongSQLMetricParam(
   (values: Seq[Long]) => {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org