You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/11 11:38:55 UTC

spark git commit: [SPARK-25278][SQL][FOLLOWUP] remove the hack in ProgressReporter

Repository: spark
Updated Branches:
  refs/heads/master 0736e72a6 -> 0e680dcf1


[SPARK-25278][SQL][FOLLOWUP] remove the hack in ProgressReporter

## What changes were proposed in this pull request?

It turns out it's a bug that a `DataSourceV2ScanExec` instance may be referred to in the execution plan multiple times. This bug is fixed by https://github.com/apache/spark/pull/22284 and now we have corrected SQL metrics for batch queries.

Thus we don't need the hack in `ProgressReporter` anymore, which fixes the same metrics problem for streaming queries.

## How was this patch tested?

existing tests

Closes #22380 from cloud-fan/followup.

Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e680dcf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e680dcf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e680dcf

Branch: refs/heads/master
Commit: 0e680dcf1e20c5632b9451adce4079bf57107dbc
Parents: 0736e72
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Sep 11 19:38:45 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 11 19:38:45 2018 +0800

----------------------------------------------------------------------
 .../execution/streaming/ProgressReporter.scala  | 36 +++-----------------
 1 file changed, 4 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e680dcf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d4b5065..73b1804 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -255,40 +255,12 @@ trait ProgressReporter extends Logging {
     }
 
     if (onlyDataSourceV2Sources) {
-      // DataSourceV2ScanExec is the execution plan leaf that is responsible for reading data
-      // from a V2 source and has a direct reference to the V2 source that generated it. Each
-      // DataSourceV2ScanExec records the number of rows it has read using SQLMetrics. However,
-      // just collecting all DataSourceV2ScanExec nodes and getting the metric is not correct as
-      // a DataSourceV2ScanExec instance may be referred to in the execution plan from two (or
-      // even multiple times) points and considering it twice will lead to double counting. We
-      // can't dedup them using their hashcode either because two different instances of
-      // DataSourceV2ScanExec can have the same hashcode but account for separate sets of
-      // records read, and deduping them to consider only one of them would be undercounting the
-      // records read. Therefore the right way to do this is to consider the unique instances of
-      // DataSourceV2ScanExec (using their identity hash codes) and get metrics from them.
-      // Hence we calculate in the following way.
-      //
-      // 1. Collect all the unique DataSourceV2ScanExec instances using IdentityHashMap.
-      //
-      // 2. Extract the source and the number of rows read from the DataSourceV2ScanExec instanes.
-      //
-      // 3. Multiple DataSourceV2ScanExec instance may refer to the same source (can happen with
-      //    self-unions or self-joins). Add up the number of rows for each unique source.
-      val uniqueStreamingExecLeavesMap =
-        new IdentityHashMap[DataSourceV2ScanExec, DataSourceV2ScanExec]()
-
-      lastExecution.executedPlan.collectLeaves().foreach {
+      val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
         case s: DataSourceV2ScanExec if s.readSupport.isInstanceOf[BaseStreamingSource] =>
-          uniqueStreamingExecLeavesMap.put(s, s)
-        case _ =>
-      }
-
-      val sourceToInputRowsTuples =
-        uniqueStreamingExecLeavesMap.values.asScala.map { execLeaf =>
-          val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
-          val source = execLeaf.readSupport.asInstanceOf[BaseStreamingSource]
+          val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+          val source = s.readSupport.asInstanceOf[BaseStreamingSource]
           source -> numRows
-        }.toSeq
+      }
       logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))
       sumRows(sourceToInputRowsTuples)
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org