You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/11 11:38:55 UTC
spark git commit: [SPARK-25278][SQL][FOLLOWUP] remove the hack in
ProgressReporter
Repository: spark
Updated Branches:
refs/heads/master 0736e72a6 -> 0e680dcf1
[SPARK-25278][SQL][FOLLOWUP] remove the hack in ProgressReporter
## What changes were proposed in this pull request?
It turns out it's a bug that a `DataSourceV2ScanExec` instance may be referred to in the execution plan multiple times. This bug is fixed by https://github.com/apache/spark/pull/22284 and now we have corrected SQL metrics for batch queries.
Thus we don't need the hack in `ProgressReporter` anymore, which fixes the same metrics problem for streaming queries.
## How was this patch tested?
existing tests
Closes #22380 from cloud-fan/followup.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e680dcf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e680dcf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e680dcf
Branch: refs/heads/master
Commit: 0e680dcf1e20c5632b9451adce4079bf57107dbc
Parents: 0736e72
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Sep 11 19:38:45 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Sep 11 19:38:45 2018 +0800
----------------------------------------------------------------------
.../execution/streaming/ProgressReporter.scala | 36 +++-----------------
1 file changed, 4 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0e680dcf/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d4b5065..73b1804 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -255,40 +255,12 @@ trait ProgressReporter extends Logging {
}
if (onlyDataSourceV2Sources) {
- // DataSourceV2ScanExec is the execution plan leaf that is responsible for reading data
- // from a V2 source and has a direct reference to the V2 source that generated it. Each
- // DataSourceV2ScanExec records the number of rows it has read using SQLMetrics. However,
- // just collecting all DataSourceV2ScanExec nodes and getting the metric is not correct as
- // a DataSourceV2ScanExec instance may be referred to in the execution plan from two (or
- // even multiple times) points and considering it twice will lead to double counting. We
- // can't dedup them using their hashcode either because two different instances of
- // DataSourceV2ScanExec can have the same hashcode but account for separate sets of
- // records read, and deduping them to consider only one of them would be undercounting the
- // records read. Therefore the right way to do this is to consider the unique instances of
- // DataSourceV2ScanExec (using their identity hash codes) and get metrics from them.
- // Hence we calculate in the following way.
- //
- // 1. Collect all the unique DataSourceV2ScanExec instances using IdentityHashMap.
- //
- // 2. Extract the source and the number of rows read from the DataSourceV2ScanExec instanes.
- //
- // 3. Multiple DataSourceV2ScanExec instance may refer to the same source (can happen with
- // self-unions or self-joins). Add up the number of rows for each unique source.
- val uniqueStreamingExecLeavesMap =
- new IdentityHashMap[DataSourceV2ScanExec, DataSourceV2ScanExec]()
-
- lastExecution.executedPlan.collectLeaves().foreach {
+ val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
case s: DataSourceV2ScanExec if s.readSupport.isInstanceOf[BaseStreamingSource] =>
- uniqueStreamingExecLeavesMap.put(s, s)
- case _ =>
- }
-
- val sourceToInputRowsTuples =
- uniqueStreamingExecLeavesMap.values.asScala.map { execLeaf =>
- val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
- val source = execLeaf.readSupport.asInstanceOf[BaseStreamingSource]
+ val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+ val source = s.readSupport.asInstanceOf[BaseStreamingSource]
source -> numRows
- }.toSeq
+ }
logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))
sumRows(sourceToInputRowsTuples)
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org