You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/11 08:23:06 UTC

spark git commit: [SPARK-13234] [SQL] remove duplicated SQL metrics

Repository: spark
Updated Branches:
  refs/heads/master b5761d150 -> 8f744fe3d


[SPARK-13234] [SQL] remove duplicated SQL metrics

For lots of SQL operators, we have metrics for both of input and output, the number of input rows should be exactly the number of output rows of child, we could only have metrics for output rows.

After we improved the performance using whole stage codegen, the overhead of SQL metrics are not trivial anymore, we should avoid that if it's not necessary.

This PR remove all the SQL metrics for number of input rows, add SQL metric of number of output rows for all LeafNode. All remove the SQL metrics from those operators that have the same number of rows from input and output (for example, Projection, we may don't need that).

The new SQL UI will looks like:

![metrics](https://cloud.githubusercontent.com/assets/40902/12965227/63614e5e-d009-11e5-88b3-84fea04f9c20.png)

Author: Davies Liu <da...@databricks.com>

Closes #11163 from davies/remove_metrics.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f744fe3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f744fe3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f744fe3

Branch: refs/heads/master
Commit: 8f744fe3d931c2380613b8e5bafa1bb1fd292839
Parents: b5761d1
Author: Davies Liu <da...@databricks.com>
Authored: Wed Feb 10 23:23:01 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Feb 10 23:23:01 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/ExistingRDD.scala       | 12 ++++++-
 .../spark/sql/execution/LocalTableScan.scala    | 12 ++++++-
 .../aggregate/SortBasedAggregate.scala          |  3 --
 .../SortBasedAggregationIterator.scala          |  3 --
 .../execution/aggregate/TungstenAggregate.scala |  3 --
 .../aggregate/TungstenAggregationIterator.scala |  3 --
 .../spark/sql/execution/basicOperators.scala    | 21 ++++-------
 .../columnar/InMemoryColumnarTableScan.scala    | 14 +++++++-
 .../sql/execution/joins/BroadcastHashJoin.scala | 18 ++--------
 .../joins/BroadcastHashOuterJoin.scala          | 26 +-------------
 .../joins/BroadcastLeftSemiJoinHash.scala       | 13 +++----
 .../joins/BroadcastNestedLoopJoin.scala         |  8 -----
 .../sql/execution/joins/CartesianProduct.scala  | 14 ++------
 .../spark/sql/execution/joins/HashJoin.scala    |  2 --
 .../sql/execution/joins/HashSemiJoin.scala      |  7 +---
 .../sql/execution/joins/HashedRelation.scala    | 10 ++----
 .../sql/execution/joins/LeftSemiJoinBNL.scala   |  6 ----
 .../sql/execution/joins/LeftSemiJoinHash.scala  | 12 +++----
 .../sql/execution/joins/SortMergeJoin.scala     | 14 ++------
 .../execution/joins/SortMergeOuterJoin.scala    | 18 ++--------
 .../execution/joins/HashedRelationSuite.scala   | 14 +++-----
 .../sql/execution/metric/SQLMetricsSuite.scala  | 37 --------------------
 .../spark/sql/util/DataFrameCallbackSuite.scala |  8 ++---
 .../sql/hive/execution/HiveTableScan.scala      | 10 +++++-
 24 files changed, 80 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 92cfd5f..cad7e25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
 import org.apache.spark.sql.types.DataType
 
@@ -103,8 +104,11 @@ private[sql] case class PhysicalRDD(
     override val outputPartitioning: Partitioning = UnknownPartitioning(0))
   extends LeafNode {
 
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
   protected override def doExecute(): RDD[InternalRow] = {
-    if (isUnsafeRow) {
+    val unsafeRow = if (isUnsafeRow) {
       rdd
     } else {
       rdd.mapPartitionsInternal { iter =>
@@ -112,6 +116,12 @@ private[sql] case class PhysicalRDD(
         iter.map(proj)
       }
     }
+
+    val numOutputRows = longMetric("numOutputRows")
+    unsafeRow.map { r =>
+      numOutputRows += 1
+      r
+    }
   }
 
   override def simpleString: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
index 59057bf..f8aec9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScan.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 
 /**
@@ -29,6 +30,9 @@ private[sql] case class LocalTableScan(
     output: Seq[Attribute],
     rows: Seq[InternalRow]) extends LeafNode {
 
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
   private val unsafeRows: Array[InternalRow] = {
     val proj = UnsafeProjection.create(output, output)
     rows.map(r => proj(r).copy()).toArray
@@ -36,7 +40,13 @@ private[sql] case class LocalTableScan(
 
   private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
 
-  protected override def doExecute(): RDD[InternalRow] = rdd
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.map { r =>
+      numOutputRows += 1
+      r
+    }
+  }
 
   override def executeCollect(): Array[InternalRow] = {
     unsafeRows

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
index 06a3991..9fcfea8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregate.scala
@@ -46,7 +46,6 @@ case class SortBasedAggregate(
       AttributeSet(aggregateBufferAttributes)
 
   override private[sql] lazy val metrics = Map(
-    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
@@ -68,7 +67,6 @@ case class SortBasedAggregate(
   }
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
-    val numInputRows = longMetric("numInputRows")
     val numOutputRows = longMetric("numOutputRows")
     child.execute().mapPartitionsInternal { iter =>
       // Because the constructor of an aggregation iterator will read at least the first row,
@@ -89,7 +87,6 @@ case class SortBasedAggregate(
           resultExpressions,
           (expressions, inputSchema) =>
             newMutableProjection(expressions, inputSchema, subexpressionEliminationEnabled),
-          numInputRows,
           numOutputRows)
         if (!hasInput && groupingExpressions.isEmpty) {
           // There is no input and there is no grouping expressions.

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 6501634..8f97498 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -35,7 +35,6 @@ class SortBasedAggregationIterator(
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
-    numInputRows: LongSQLMetric,
     numOutputRows: LongSQLMetric)
   extends AggregationIterator(
     groupingExpressions,
@@ -97,7 +96,6 @@ class SortBasedAggregationIterator(
       val inputRow = inputIterator.next()
       nextGroupingKey = groupingProjection(inputRow).copy()
       firstRowInNextGroup = inputRow.copy()
-      numInputRows += 1
       sortedInputHasNewGroup = true
     } else {
       // This inputIter is empty.
@@ -122,7 +120,6 @@ class SortBasedAggregationIterator(
       // Get the grouping key.
       val currentRow = inputIterator.next()
       val groupingKey = groupingProjection(currentRow)
-      numInputRows += 1
 
       // Check if the current row belongs the current input row.
       if (currentGroupingKey == groupingKey) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 340b8f7..a6950f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -47,7 +47,6 @@ case class TungstenAggregate(
   require(TungstenAggregate.supportsAggregate(aggregateBufferAttributes))
 
   override private[sql] lazy val metrics = Map(
-    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
@@ -77,7 +76,6 @@ case class TungstenAggregate(
   }
 
   protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
-    val numInputRows = longMetric("numInputRows")
     val numOutputRows = longMetric("numOutputRows")
     val dataSize = longMetric("dataSize")
     val spillSize = longMetric("spillSize")
@@ -102,7 +100,6 @@ case class TungstenAggregate(
             child.output,
             iter,
             testFallbackStartsAt,
-            numInputRows,
             numOutputRows,
             dataSize,
             spillSize)

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 001e9c3..c4f6594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -85,7 +85,6 @@ class TungstenAggregationIterator(
     originalInputAttributes: Seq[Attribute],
     inputIter: Iterator[InternalRow],
     testFallbackStartsAt: Option[Int],
-    numInputRows: LongSQLMetric,
     numOutputRows: LongSQLMetric,
     dataSize: LongSQLMetric,
     spillSize: LongSQLMetric)
@@ -179,14 +178,12 @@ class TungstenAggregationIterator(
       val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
       while (inputIter.hasNext) {
         val newInput = inputIter.next()
-        numInputRows += 1
         processRow(buffer, newInput)
       }
     } else {
       var i = 0
       while (inputIter.hasNext) {
         val newInput = inputIter.next()
-        numInputRows += 1
         val groupingKey = groupingProjection.apply(newInput)
         var buffer: UnsafeRow = null
         if (i < fallbackStartsAt) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index f63e8a9..949acb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -29,9 +29,6 @@ import org.apache.spark.util.random.PoissonSampler
 case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
   extends UnaryNode with CodegenSupport {
 
-  override private[sql] lazy val metrics = Map(
-    "numRows" -> SQLMetrics.createLongMetric(sparkContext, "number of rows"))
-
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
 
   override def upstream(): RDD[InternalRow] = {
@@ -55,14 +52,10 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numRows = longMetric("numRows")
     child.execute().mapPartitionsInternal { iter =>
       val project = UnsafeProjection.create(projectList, child.output,
         subexpressionEliminationEnabled)
-      iter.map { row =>
-        numRows += 1
-        project(row)
-      }
+      iter.map(project)
     }
   }
 
@@ -74,7 +67,6 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
   override def output: Seq[Attribute] = child.output
 
   private[sql] override lazy val metrics = Map(
-    "numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   override def upstream(): RDD[InternalRow] = {
@@ -104,12 +96,10 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numInputRows = longMetric("numInputRows")
     val numOutputRows = longMetric("numOutputRows")
     child.execute().mapPartitionsInternal { iter =>
       val predicate = newPredicate(condition, child.output)
       iter.filter { row =>
-        numInputRows += 1
         val r = predicate(row)
         if (r) numOutputRows += 1
         r
@@ -135,9 +125,7 @@ case class Sample(
     upperBound: Double,
     withReplacement: Boolean,
     seed: Long,
-    child: SparkPlan)
-  extends UnaryNode
-{
+    child: SparkPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -163,6 +151,9 @@ case class Range(
     output: Seq[Attribute])
   extends LeafNode with CodegenSupport {
 
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
   override def upstream(): RDD[InternalRow] = {
     sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i))
   }
@@ -241,6 +232,7 @@ case class Range(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
     sqlContext
       .sparkContext
       .parallelize(0 until numSlices, numSlices)
@@ -283,6 +275,7 @@ case class Range(
               overflow = true
             }
 
+            numOutputRows += 1
             unsafeRow.setLong(0, ret)
             unsafeRow
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
index 9084b74..4858140 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan}
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.UserDefinedType
 import org.apache.spark.storage.StorageLevel
 
@@ -216,6 +217,9 @@ private[sql] case class InMemoryColumnarTableScan(
     @transient relation: InMemoryRelation)
   extends LeafNode {
 
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
   override def output: Seq[Attribute] = attributes
 
   // The cached version does not change the outputPartitioning of the original SparkPlan.
@@ -286,6 +290,8 @@ private[sql] case class InMemoryColumnarTableScan(
   private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+
     if (enableAccumulators) {
       readPartitions.setValue(0)
       readBatches.setValue(0)
@@ -332,12 +338,18 @@ private[sql] case class InMemoryColumnarTableScan(
           cachedBatchIterator
         }
 
+      // update SQL metrics
+      val withMetrics = cachedBatchesToScan.map { batch =>
+        numOutputRows += batch.numRows
+        batch
+      }
+
       val columnTypes = requestedColumnDataTypes.map {
         case udt: UserDefinedType[_] => udt.sqlType
         case other => other
       }.toArray
       val columnarIterator = GenerateColumnAccessor.generate(columnTypes)
-      columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray)
+      columnarIterator.initialize(withMetrics, columnTypes, requestedColumnIndices.toArray)
       if (enableAccumulators && columnarIterator.hasNext) {
         readPartitions += 1
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index cbd5497..35c7963 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -48,8 +48,6 @@ case class BroadcastHashJoin(
   extends BinaryNode with HashJoin with CodegenSupport {
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   val timeout: Duration = {
@@ -70,11 +68,6 @@ case class BroadcastHashJoin(
   // for the same query.
   @transient
   private lazy val broadcastFuture = {
-    val numBuildRows = buildSide match {
-      case BuildLeft => longMetric("numLeftRows")
-      case BuildRight => longMetric("numRightRows")
-    }
-
     // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     Future {
@@ -84,7 +77,6 @@ case class BroadcastHashJoin(
         // Note that we use .execute().collect() because we don't want to convert data to Scala
         // types
         val input: Array[InternalRow] = buildPlan.execute().map { row =>
-          numBuildRows += 1
           row.copy()
         }.collect()
         // The following line doesn't run in a job so we cannot track the metric value. However, we
@@ -93,10 +85,10 @@ case class BroadcastHashJoin(
         // TODO: move this check into HashedRelation
         val hashed = if (canJoinKeyFitWithinLong) {
           LongHashedRelation(
-            input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
+            input.iterator, buildSideKeyGenerator, input.size)
         } else {
           HashedRelation(
-            input.iterator, SQLMetrics.nullLongMetric, buildSideKeyGenerator, input.size)
+            input.iterator, buildSideKeyGenerator, input.size)
         }
         sparkContext.broadcast(hashed)
       }
@@ -108,10 +100,6 @@ case class BroadcastHashJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numStreamedRows = buildSide match {
-      case BuildLeft => longMetric("numRightRows")
-      case BuildRight => longMetric("numLeftRows")
-    }
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -119,7 +107,7 @@ case class BroadcastHashJoin(
     streamedPlan.execute().mapPartitions { streamedIter =>
       val hashedRelation = broadcastRelation.value
       TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
-      hashJoin(streamedIter, numStreamedRows, hashedRelation, numOutputRows)
+      hashJoin(streamedIter, hashedRelation, numOutputRows)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
index ad32756..5e8c8ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
@@ -44,8 +44,6 @@ case class BroadcastHashOuterJoin(
     right: SparkPlan) extends BinaryNode with HashOuterJoin {
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   val timeout = {
@@ -66,14 +64,6 @@ case class BroadcastHashOuterJoin(
   // for the same query.
   @transient
   private lazy val broadcastFuture = {
-    val numBuildRows = joinType match {
-      case RightOuter => longMetric("numLeftRows")
-      case LeftOuter => longMetric("numRightRows")
-      case x =>
-        throw new IllegalArgumentException(
-          s"HashOuterJoin should not take $x as the JoinType")
-    }
-
     // broadcastFuture is used in "doExecute". Therefore we can get the execution id correctly here.
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     Future {
@@ -83,14 +73,9 @@ case class BroadcastHashOuterJoin(
         // Note that we use .execute().collect() because we don't want to convert data to Scala
         // types
         val input: Array[InternalRow] = buildPlan.execute().map { row =>
-          numBuildRows += 1
           row.copy()
         }.collect()
-        // The following line doesn't run in a job so we cannot track the metric value. However, we
-        // have already tracked it in the above lines. So here we can use
-        // `SQLMetrics.nullLongMetric` to ignore it.
-        val hashed = HashedRelation(
-          input.iterator, SQLMetrics.nullLongMetric, buildKeyGenerator, input.size)
+        val hashed = HashedRelation(input.iterator, buildKeyGenerator, input.size)
         sparkContext.broadcast(hashed)
       }
     }(BroadcastHashJoin.broadcastHashJoinExecutionContext)
@@ -101,13 +86,6 @@ case class BroadcastHashOuterJoin(
   }
 
   override def doExecute(): RDD[InternalRow] = {
-    val numStreamedRows = joinType match {
-      case RightOuter => longMetric("numRightRows")
-      case LeftOuter => longMetric("numLeftRows")
-      case x =>
-        throw new IllegalArgumentException(
-          s"HashOuterJoin should not take $x as the JoinType")
-    }
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastRelation = Await.result(broadcastFuture, timeout)
@@ -122,7 +100,6 @@ case class BroadcastHashOuterJoin(
       joinType match {
         case LeftOuter =>
           streamedIter.flatMap(currentRow => {
-            numStreamedRows += 1
             val rowKey = keyGenerator(currentRow)
             joinedRow.withLeft(currentRow)
             leftOuterIterator(rowKey, joinedRow, hashTable.get(rowKey), resultProj, numOutputRows)
@@ -130,7 +107,6 @@ case class BroadcastHashOuterJoin(
 
         case RightOuter =>
           streamedIter.flatMap(currentRow => {
-            numStreamedRows += 1
             val rowKey = keyGenerator(currentRow)
             joinedRow.withRight(currentRow)
             rightOuterIterator(rowKey, hashTable.get(rowKey), joinedRow, resultProj, numOutputRows)

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
index d0e18df..4f1cfd2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala
@@ -36,36 +36,31 @@ case class BroadcastLeftSemiJoinHash(
     condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
     val numOutputRows = longMetric("numOutputRows")
 
     val input = right.execute().map { row =>
-      numRightRows += 1
       row.copy()
     }.collect()
 
     if (condition.isEmpty) {
-      val hashSet = buildKeyHashSet(input.toIterator, SQLMetrics.nullLongMetric)
+      val hashSet = buildKeyHashSet(input.toIterator)
       val broadcastedRelation = sparkContext.broadcast(hashSet)
 
       left.execute().mapPartitionsInternal { streamIter =>
-        hashSemiJoin(streamIter, numLeftRows, broadcastedRelation.value, numOutputRows)
+        hashSemiJoin(streamIter, broadcastedRelation.value, numOutputRows)
       }
     } else {
       val hashRelation =
-        HashedRelation(input.toIterator, SQLMetrics.nullLongMetric, rightKeyGenerator, input.size)
+        HashedRelation(input.toIterator, rightKeyGenerator, input.size)
       val broadcastedRelation = sparkContext.broadcast(hashRelation)
 
       left.execute().mapPartitionsInternal { streamIter =>
         val hashedRelation = broadcastedRelation.value
         TaskContext.get().taskMetrics().incPeakExecutionMemory(hashedRelation.getMemorySize)
-        hashSemiJoin(streamIter, numLeftRows, hashedRelation, numOutputRows)
+        hashSemiJoin(streamIter, hashedRelation, numOutputRows)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index e55f869..4585cbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -36,8 +36,6 @@ case class BroadcastNestedLoopJoin(
   // TODO: Override requiredChildDistribution.
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   /** BuildRight means the right relation <=> the broadcast relation. */
@@ -73,15 +71,10 @@ case class BroadcastNestedLoopJoin(
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val (numStreamedRows, numBuildRows) = buildSide match {
-      case BuildRight => (longMetric("numLeftRows"), longMetric("numRightRows"))
-      case BuildLeft => (longMetric("numRightRows"), longMetric("numLeftRows"))
-    }
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastedRelation =
       sparkContext.broadcast(broadcast.execute().map { row =>
-        numBuildRows += 1
         row.copy()
       }.collect().toIndexedSeq)
 
@@ -98,7 +91,6 @@ case class BroadcastNestedLoopJoin(
       streamedIter.foreach { streamedRow =>
         var i = 0
         var streamRowMatched = false
-        numStreamedRows += 1
 
         while (i < broadcastedRelation.value.size) {
           val broadcastedRow = broadcastedRelation.value(i)

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index 93d32e1..e417079 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -82,23 +82,13 @@ case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNod
   override def output: Seq[Attribute] = left.output ++ right.output
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
     val numOutputRows = longMetric("numOutputRows")
 
-    val leftResults = left.execute().map { row =>
-      numLeftRows += 1
-      row.asInstanceOf[UnsafeRow]
-    }
-    val rightResults = right.execute().map { row =>
-      numRightRows += 1
-      row.asInstanceOf[UnsafeRow]
-    }
+    val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
+    val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]
 
     val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size)
     pair.mapPartitionsInternal { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index ecbb1ac..332a748 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -99,7 +99,6 @@ trait HashJoin {
 
   protected def hashJoin(
       streamIter: Iterator[InternalRow],
-      numStreamRows: LongSQLMetric,
       hashedRelation: HashedRelation,
       numOutputRows: LongSQLMetric): Iterator[InternalRow] =
   {
@@ -126,7 +125,6 @@ trait HashJoin {
           // find the next match
           while (currentHashMatches == null && streamIter.hasNext) {
             currentStreamedRow = streamIter.next()
-            numStreamRows += 1
             val key = joinKeys(currentStreamedRow)
             if (!key.anyNull) {
               currentHashMatches = hashedRelation.get(key)

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
index 3e0f74c..0220e0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashSemiJoin.scala
@@ -43,14 +43,13 @@ trait HashSemiJoin {
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
 
   protected def buildKeyHashSet(
-      buildIter: Iterator[InternalRow], numBuildRows: LongSQLMetric): java.util.Set[InternalRow] = {
+      buildIter: Iterator[InternalRow]): java.util.Set[InternalRow] = {
     val hashSet = new java.util.HashSet[InternalRow]()
 
     // Create a Hash set of buildKeys
     val rightKey = rightKeyGenerator
     while (buildIter.hasNext) {
       val currentRow = buildIter.next()
-      numBuildRows += 1
       val rowKey = rightKey(currentRow)
       if (!rowKey.anyNull) {
         val keyExists = hashSet.contains(rowKey)
@@ -65,12 +64,10 @@ trait HashSemiJoin {
 
   protected def hashSemiJoin(
     streamIter: Iterator[InternalRow],
-    numStreamRows: LongSQLMetric,
     hashSet: java.util.Set[InternalRow],
     numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
     val joinKeys = leftKeyGenerator
     streamIter.filter(current => {
-      numStreamRows += 1
       val key = joinKeys(current)
       val r = !key.anyNull && hashSet.contains(key)
       if (r) numOutputRows += 1
@@ -80,13 +77,11 @@ trait HashSemiJoin {
 
   protected def hashSemiJoin(
       streamIter: Iterator[InternalRow],
-      numStreamRows: LongSQLMetric,
       hashedRelation: HashedRelation,
       numOutputRows: LongSQLMetric): Iterator[InternalRow] = {
     val joinKeys = leftKeyGenerator
     val joinedRow = new JoinedRow
     streamIter.filter { current =>
-      numStreamRows += 1
       val key = joinKeys(current)
       lazy val rowBuffer = hashedRelation.get(key)
       val r = !key.anyNull && rowBuffer != null && rowBuffer.exists {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index eb6930a..0978570 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -159,18 +159,17 @@ private[joins] class UniqueKeyHashedRelation(
 private[execution] object HashedRelation {
 
   def apply(localNode: LocalNode, keyGenerator: Projection): HashedRelation = {
-    apply(localNode.asIterator, SQLMetrics.nullLongMetric, keyGenerator)
+    apply(localNode.asIterator, keyGenerator)
   }
 
   def apply(
       input: Iterator[InternalRow],
-      numInputRows: LongSQLMetric,
       keyGenerator: Projection,
       sizeEstimate: Int = 64): HashedRelation = {
 
     if (keyGenerator.isInstanceOf[UnsafeProjection]) {
       return UnsafeHashedRelation(
-        input, numInputRows, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
+        input, keyGenerator.asInstanceOf[UnsafeProjection], sizeEstimate)
     }
 
     // TODO: Use Spark's HashMap implementation.
@@ -184,7 +183,6 @@ private[execution] object HashedRelation {
     // Create a mapping of buildKeys -> rows
     while (input.hasNext) {
       currentRow = input.next()
-      numInputRows += 1
       val rowKey = keyGenerator(currentRow)
       if (!rowKey.anyNull) {
         val existingMatchList = hashTable.get(rowKey)
@@ -427,7 +425,6 @@ private[joins] object UnsafeHashedRelation {
 
   def apply(
       input: Iterator[InternalRow],
-      numInputRows: LongSQLMetric,
       keyGenerator: UnsafeProjection,
       sizeEstimate: Int): HashedRelation = {
 
@@ -437,7 +434,6 @@ private[joins] object UnsafeHashedRelation {
     // Create a mapping of buildKeys -> rows
     while (input.hasNext) {
       val unsafeRow = input.next().asInstanceOf[UnsafeRow]
-      numInputRows += 1
       val rowKey = keyGenerator(unsafeRow)
       if (!rowKey.anyNull) {
         val existingMatchList = hashTable.get(rowKey)
@@ -604,7 +600,6 @@ private[joins] object LongHashedRelation {
 
   def apply(
     input: Iterator[InternalRow],
-    numInputRows: LongSQLMetric,
     keyGenerator: Projection,
     sizeEstimate: Int): HashedRelation = {
 
@@ -619,7 +614,6 @@ private[joins] object LongHashedRelation {
     while (input.hasNext) {
       val unsafeRow = input.next().asInstanceOf[UnsafeRow]
       numFields = unsafeRow.numFields()
-      numInputRows += 1
       val rowKey = keyGenerator(unsafeRow)
       if (!rowKey.anyNull) {
         val key = rowKey.getLong(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 82498ee..ce758d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -34,8 +34,6 @@ case class LeftSemiJoinBNL(
   // TODO: Override requiredChildDistribution.
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   override def outputPartitioning: Partitioning = streamed.outputPartitioning
@@ -52,13 +50,10 @@ case class LeftSemiJoinBNL(
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
     val numOutputRows = longMetric("numOutputRows")
 
     val broadcastedRelation =
       sparkContext.broadcast(broadcast.execute().map { row =>
-        numRightRows += 1
         row.copy()
       }.collect().toIndexedSeq)
 
@@ -66,7 +61,6 @@ case class LeftSemiJoinBNL(
       val joinedRow = new JoinedRow
 
       streamedIter.filter(streamedRow => {
-        numLeftRows += 1
         var i = 0
         var matched = false
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 25b3b5c..d8d3045 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -36,8 +36,6 @@ case class LeftSemiJoinHash(
     condition: Option[Expression]) extends BinaryNode with HashSemiJoin {
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   override def outputPartitioning: Partitioning = left.outputPartitioning
@@ -46,17 +44,15 @@ case class LeftSemiJoinHash(
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
     val numOutputRows = longMetric("numOutputRows")
 
     right.execute().zipPartitions(left.execute()) { (buildIter, streamIter) =>
       if (condition.isEmpty) {
-        val hashSet = buildKeyHashSet(buildIter, numRightRows)
-        hashSemiJoin(streamIter, numLeftRows, hashSet, numOutputRows)
+        val hashSet = buildKeyHashSet(buildIter)
+        hashSemiJoin(streamIter, hashSet, numOutputRows)
       } else {
-        val hashRelation = HashedRelation(buildIter, numRightRows, rightKeyGenerator)
-        hashSemiJoin(streamIter, numLeftRows, hashRelation, numOutputRows)
+        val hashRelation = HashedRelation(buildIter, rightKeyGenerator)
+        hashSemiJoin(streamIter, hashRelation, numOutputRows)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 322a954..cd8a567 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -37,8 +37,6 @@ case class SortMergeJoin(
     right: SparkPlan) extends BinaryNode {
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = left.output ++ right.output
@@ -60,8 +58,6 @@ case class SortMergeJoin(
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
     val numOutputRows = longMetric("numOutputRows")
 
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
@@ -89,9 +85,7 @@ case class SortMergeJoin(
           rightKeyGenerator,
           keyOrdering,
           RowIterator.fromScala(leftIter),
-          numLeftRows,
-          RowIterator.fromScala(rightIter),
-          numRightRows
+          RowIterator.fromScala(rightIter)
         )
         private[this] val joinRow = new JoinedRow
         private[this] val resultProjection: (InternalRow) => InternalRow =
@@ -157,9 +151,7 @@ private[joins] class SortMergeJoinScanner(
     bufferedKeyGenerator: Projection,
     keyOrdering: Ordering[InternalRow],
     streamedIter: RowIterator,
-    numStreamedRows: LongSQLMetric,
-    bufferedIter: RowIterator,
-    numBufferedRows: LongSQLMetric) {
+    bufferedIter: RowIterator) {
   private[this] var streamedRow: InternalRow = _
   private[this] var streamedRowKey: InternalRow = _
   private[this] var bufferedRow: InternalRow = _
@@ -284,7 +276,6 @@ private[joins] class SortMergeJoinScanner(
     if (streamedIter.advanceNext()) {
       streamedRow = streamedIter.getRow
       streamedRowKey = streamedKeyGenerator(streamedRow)
-      numStreamedRows += 1
       true
     } else {
       streamedRow = null
@@ -302,7 +293,6 @@ private[joins] class SortMergeJoinScanner(
     while (!foundRow && bufferedIter.advanceNext()) {
       bufferedRow = bufferedIter.getRow
       bufferedRowKey = bufferedKeyGenerator(bufferedRow)
-      numBufferedRows += 1
       foundRow = !bufferedRowKey.anyNull
     }
     if (!foundRow) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
index ed41ad2..40a6c93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeOuterJoin.scala
@@ -40,8 +40,6 @@ case class SortMergeOuterJoin(
     right: SparkPlan) extends BinaryNode {
 
   override private[sql] lazy val metrics = Map(
-    "numLeftRows" -> SQLMetrics.createLongMetric(sparkContext, "number of left rows"),
-    "numRightRows" -> SQLMetrics.createLongMetric(sparkContext, "number of right rows"),
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
 
   override def output: Seq[Attribute] = {
@@ -96,8 +94,6 @@ case class SortMergeOuterJoin(
     UnsafeProjection.create(rightKeys, right.output)
 
   override def doExecute(): RDD[InternalRow] = {
-    val numLeftRows = longMetric("numLeftRows")
-    val numRightRows = longMetric("numRightRows")
     val numOutputRows = longMetric("numOutputRows")
 
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
@@ -119,9 +115,7 @@ case class SortMergeOuterJoin(
             bufferedKeyGenerator = createRightKeyGenerator(),
             keyOrdering,
             streamedIter = RowIterator.fromScala(leftIter),
-            numLeftRows,
-            bufferedIter = RowIterator.fromScala(rightIter),
-            numRightRows
+            bufferedIter = RowIterator.fromScala(rightIter)
           )
           val rightNullRow = new GenericInternalRow(right.output.length)
           new LeftOuterIterator(
@@ -133,9 +127,7 @@ case class SortMergeOuterJoin(
             bufferedKeyGenerator = createLeftKeyGenerator(),
             keyOrdering,
             streamedIter = RowIterator.fromScala(rightIter),
-            numRightRows,
-            bufferedIter = RowIterator.fromScala(leftIter),
-            numLeftRows
+            bufferedIter = RowIterator.fromScala(leftIter)
           )
           val leftNullRow = new GenericInternalRow(left.output.length)
           new RightOuterIterator(
@@ -149,9 +141,7 @@ case class SortMergeOuterJoin(
             rightKeyGenerator = createRightKeyGenerator(),
             keyOrdering,
             leftIter = RowIterator.fromScala(leftIter),
-            numLeftRows,
             rightIter = RowIterator.fromScala(rightIter),
-            numRightRows,
             boundCondition,
             leftNullRow,
             rightNullRow)
@@ -289,9 +279,7 @@ private class SortMergeFullOuterJoinScanner(
     rightKeyGenerator: Projection,
     keyOrdering: Ordering[InternalRow],
     leftIter: RowIterator,
-    numLeftRows: LongSQLMetric,
     rightIter: RowIterator,
-    numRightRows: LongSQLMetric,
     boundCondition: InternalRow => Boolean,
     leftNullRow: InternalRow,
     rightNullRow: InternalRow)  {
@@ -321,7 +309,6 @@ private class SortMergeFullOuterJoinScanner(
     if (leftIter.advanceNext()) {
       leftRow = leftIter.getRow
       leftRowKey = leftKeyGenerator(leftRow)
-      numLeftRows += 1
       true
     } else {
       leftRow = null
@@ -338,7 +325,6 @@ private class SortMergeFullOuterJoinScanner(
     if (rightIter.advanceNext()) {
       rightRow = rightIter.getRow
       rightRowKey = rightKeyGenerator(rightRow)
-      numRightRows += 1
       true
     } else {
       rightRow = null

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index f985dfb..04dd809 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -36,8 +36,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   test("GeneralHashedRelation") {
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
-    val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
-    val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
+    val hashed = HashedRelation(data.iterator, keyProjection)
     assert(hashed.isInstanceOf[GeneralHashedRelation])
 
     assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -47,13 +46,11 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
     val data2 = CompactBuffer[InternalRow](data(2))
     data2 += data(2)
     assert(hashed.get(data(2)) === data2)
-    assert(numDataRows.value.value === data.length)
   }
 
   test("UniqueKeyHashedRelation") {
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
-    val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
-    val hashed = HashedRelation(data.iterator, numDataRows, keyProjection)
+    val hashed = HashedRelation(data.iterator, keyProjection)
     assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
 
     assert(hashed.get(data(0)) === CompactBuffer[InternalRow](data(0)))
@@ -66,19 +63,17 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
     assert(uniqHashed.getValue(data(1)) === data(1))
     assert(uniqHashed.getValue(data(2)) === data(2))
     assert(uniqHashed.getValue(InternalRow(10)) === null)
-    assert(numDataRows.value.value === data.length)
   }
 
   test("UnsafeHashedRelation") {
     val schema = StructType(StructField("a", IntegerType, true) :: Nil)
     val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2))
-    val numDataRows = SQLMetrics.createLongMetric(sparkContext, "data")
     val toUnsafe = UnsafeProjection.create(schema)
     val unsafeData = data.map(toUnsafe(_).copy()).toArray
 
     val buildKey = Seq(BoundReference(0, IntegerType, false))
     val keyGenerator = UnsafeProjection.create(buildKey)
-    val hashed = UnsafeHashedRelation(unsafeData.iterator, numDataRows, keyGenerator, 1)
+    val hashed = UnsafeHashedRelation(unsafeData.iterator, keyGenerator, 1)
     assert(hashed.isInstanceOf[UnsafeHashedRelation])
 
     assert(hashed.get(unsafeData(0)) === CompactBuffer[InternalRow](unsafeData(0)))
@@ -100,7 +95,6 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
     assert(hashed2.get(unsafeData(1)) === CompactBuffer[InternalRow](unsafeData(1)))
     assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
     assert(hashed2.get(unsafeData(2)) === data2)
-    assert(numDataRows.value.value === data.length)
 
     val os2 = new ByteArrayOutputStream()
     val out2 = new ObjectOutputStream(os2)
@@ -139,7 +133,7 @@ class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
       Seq(BoundReference(0, IntegerType, false), BoundReference(1, IntegerType, true)))
     val rows = (0 until 100).map(i => unsafeProj(InternalRow(i, i + 1)).copy())
     val keyProj = UnsafeProjection.create(Seq(BoundReference(0, IntegerType, false)))
-    val longRelation = LongHashedRelation(rows.iterator, SQLMetrics.nullLongMetric, keyProj, 100)
+    val longRelation = LongHashedRelation(rows.iterator, keyProj, 100)
     assert(longRelation.isInstanceOf[LongArrayRelation])
     val longArrayRelation = longRelation.asInstanceOf[LongArrayRelation]
     (0 until 100).foreach { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 2260e48..d24625a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -113,23 +113,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     }
   }
 
-  test("Project metrics") {
-    // Assume the execution plan is
-    // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
-    val df = person.select('name)
-    testSparkPlanMetrics(df, 1, Map(
-      0L -> ("Project", Map(
-        "number of rows" -> 2L)))
-    )
-  }
-
   test("Filter metrics") {
     // Assume the execution plan is
     // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
     val df = person.filter('age < 25)
     testSparkPlanMetrics(df, 1, Map(
       0L -> ("Filter", Map(
-        "number of input rows" -> 2L,
         "number of output rows" -> 1L)))
     )
   }
@@ -149,10 +138,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     val df = testData2.groupBy().count() // 2 partitions
     testSparkPlanMetrics(df, 1, Map(
       2L -> ("TungstenAggregate", Map(
-        "number of input rows" -> 6L,
         "number of output rows" -> 2L)),
       0L -> ("TungstenAggregate", Map(
-        "number of input rows" -> 2L,
         "number of output rows" -> 1L)))
     )
 
@@ -160,10 +147,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     val df2 = testData2.groupBy('a).count()
     testSparkPlanMetrics(df2, 1, Map(
       2L -> ("TungstenAggregate", Map(
-        "number of input rows" -> 6L,
         "number of output rows" -> 4L)),
       0L -> ("TungstenAggregate", Map(
-        "number of input rows" -> 4L,
         "number of output rows" -> 3L)))
     )
   }
@@ -181,8 +166,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       testSparkPlanMetrics(df, 1, Map(
         1L -> ("SortMergeJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
-          "number of left rows" -> 4L,
-          "number of right rows" -> 2L,
           "number of output rows" -> 4L)))
       )
     }
@@ -201,8 +184,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       testSparkPlanMetrics(df, 1, Map(
         1L -> ("SortMergeOuterJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
-          "number of left rows" -> 6L,
-          "number of right rows" -> 2L,
           "number of output rows" -> 8L)))
       )
 
@@ -211,8 +192,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       testSparkPlanMetrics(df2, 1, Map(
         1L -> ("SortMergeOuterJoin", Map(
           // It's 4 because we only read 3 rows in the first partition and 1 row in the second one
-          "number of left rows" -> 2L,
-          "number of right rows" -> 6L,
           "number of output rows" -> 8L)))
       )
     }
@@ -226,8 +205,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     val df = df1.join(broadcast(df2), "key")
     testSparkPlanMetrics(df, 2, Map(
       1L -> ("BroadcastHashJoin", Map(
-        "number of left rows" -> 2L,
-        "number of right rows" -> 4L,
         "number of output rows" -> 2L)))
     )
   }
@@ -240,16 +217,12 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
     testSparkPlanMetrics(df, 2, Map(
       0L -> ("BroadcastHashOuterJoin", Map(
-        "number of left rows" -> 3L,
-        "number of right rows" -> 4L,
         "number of output rows" -> 5L)))
     )
 
     val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
     testSparkPlanMetrics(df3, 2, Map(
       0L -> ("BroadcastHashOuterJoin", Map(
-        "number of left rows" -> 3L,
-        "number of right rows" -> 4L,
         "number of output rows" -> 6L)))
     )
   }
@@ -265,8 +238,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
           "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a")
       testSparkPlanMetrics(df, 3, Map(
         1L -> ("BroadcastNestedLoopJoin", Map(
-          "number of left rows" -> 12L, // left needs to be scanned twice
-          "number of right rows" -> 2L,
           "number of output rows" -> 12L)))
       )
     }
@@ -280,8 +251,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
     testSparkPlanMetrics(df, 2, Map(
       0L -> ("BroadcastLeftSemiJoinHash", Map(
-        "number of left rows" -> 2L,
-        "number of right rows" -> 4L,
         "number of output rows" -> 2L)))
     )
   }
@@ -295,8 +264,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
       val df = df1.join(df2, $"key" === $"key2", "leftsemi")
       testSparkPlanMetrics(df, 1, Map(
         0L -> ("LeftSemiJoinHash", Map(
-          "number of left rows" -> 2L,
-          "number of right rows" -> 4L,
           "number of output rows" -> 2L)))
       )
     }
@@ -310,8 +277,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     val df = df1.join(df2, $"key" < $"key2", "leftsemi")
     testSparkPlanMetrics(df, 2, Map(
       0L -> ("LeftSemiJoinBNL", Map(
-        "number of left rows" -> 2L,
-        "number of right rows" -> 4L,
         "number of output rows" -> 2L)))
     )
   }
@@ -326,8 +291,6 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
         "SELECT * FROM testData2 JOIN testDataForJoin")
       testSparkPlanMetrics(df, 1, Map(
         1L -> ("CartesianProduct", Map(
-          "number of left rows" -> 12L, // left needs to be scanned twice
-          "number of right rows" -> 4L, // right is read twice
           "number of output rows" -> 12L)))
       )
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index a3e5243..d3191d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
       override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
 
       override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
-        metrics += qe.executedPlan.longMetric("numInputRows").value.value
+        metrics += qe.executedPlan.longMetric("numOutputRows").value.value
       }
     }
     sqlContext.listenerManager.register(listener)
@@ -105,9 +105,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
     }
 
     assert(metrics.length == 3)
-    assert(metrics(0) == 1)
-    assert(metrics(1) == 1)
-    assert(metrics(2) == 2)
+    assert(metrics(0) === 1)
+    assert(metrics(1) === 1)
+    assert(metrics(2) === 2)
 
     sqlContext.listenerManager.unregister(listener)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8f744fe3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index eff8833..235b80b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -30,6 +30,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.types.{BooleanType, DataType}
 import org.apache.spark.util.Utils
@@ -52,6 +53,9 @@ case class HiveTableScan(
   require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
     "Partition pruning predicates only supported for partitioned tables.")
 
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
+
   override def producedAttributes: AttributeSet = outputSet ++
     AttributeSet(partitionPruningPred.flatMap(_.references))
 
@@ -146,9 +150,13 @@ case class HiveTableScan(
           prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
       }
     }
+    val numOutputRows = longMetric("numOutputRows")
     rdd.mapPartitionsInternal { iter =>
       val proj = UnsafeProjection.create(schema)
-      iter.map(proj)
+      iter.map { r =>
+        numOutputRows += 1
+        proj(r)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org