You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/19 16:29:57 UTC

spark git commit: [SPARK-25414][SS][TEST] make it clear that the numRows metrics should be counted for each scan of the source

Repository: spark
Updated Branches:
  refs/heads/master 12b1e91e6 -> a71f6a175


[SPARK-25414][SS][TEST] make it clear that the numRows metrics should be counted for each scan of the source

## What changes were proposed in this pull request?

For self-join/self-union, Spark will produce a physical plan which has multiple `DataSourceV2ScanExec` instances referring to the same `ReadSupport` instance. In this case, the streaming source is indeed scanned multiple times, and the `numInputRows` metrics should be counted for each scan.

Actually we already have 2 test cases to verify the behavior:
1. `StreamingQuerySuite.input row calculation with same V2 source used twice in self-join`
2. `KafkaMicroBatchSourceSuiteBase.ensure stream-stream self-join generates only one offset in log and correct metrics`.

However, in these 2 tests, the expected result is different, which is super confusing. It turns out that, the first test doesn't trigger exchange reuse, so the source is scanned twice. The second test triggers exchange reuse, and the source is scanned only once.

This PR proposes to improve these 2 tests, to test with/without exchange reuse.

## How was this patch tested?

test only change

Closes #22402 from cloud-fan/bug.

Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a71f6a17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a71f6a17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a71f6a17

Branch: refs/heads/master
Commit: a71f6a1750fd0a29ecae6b98673ee15840da1c62
Parents: 12b1e91
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Sep 20 00:29:48 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Sep 20 00:29:48 2018 +0800

----------------------------------------------------------------------
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 39 +++++++++++++----
 .../execution/streaming/ProgressReporter.scala  |  6 +--
 .../sql/streaming/StreamingQuerySuite.scala     | 46 +++++++++++++++-----
 3 files changed, 68 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a71f6a17/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 8e246db..e5f0088 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -35,9 +35,11 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.sql.{ForeachWriter, SparkSession}
 import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
@@ -598,18 +600,37 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
 
     val join = values.join(values, "key")
 
-    testStream(join)(
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2),
-      CheckAnswer((1, 1, 1), (2, 2, 2)),
-      AddKafkaData(Set(topic), 6, 3),
-      CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6)),
-      AssertOnQuery { q =>
+    def checkQuery(check: AssertOnQuery): Unit = {
+      testStream(join)(
+        makeSureGetOffsetCalled,
+        AddKafkaData(Set(topic), 1, 2),
+        CheckAnswer((1, 1, 1), (2, 2, 2)),
+        AddKafkaData(Set(topic), 6, 3),
+        CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6)),
+        check
+      )
+    }
+
+    withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+      checkQuery(AssertOnQuery { q =>
         assert(q.availableOffsets.iterator.size == 1)
+        // The kafka source is scanned twice because of self-join
+        assert(q.recentProgress.map(_.numInputRows).sum == 8)
+        true
+      })
+    }
+
+    withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true") {
+      checkQuery(AssertOnQuery { q =>
+        assert(q.availableOffsets.iterator.size == 1)
+        assert(q.lastExecution.executedPlan.collect {
+          case r: ReusedExchangeExec => r
+        }.length == 1)
+        // The kafka source is scanned only once because of exchange reuse.
         assert(q.recentProgress.map(_.numInputRows).sum == 4)
         true
-      }
-    )
+      })
+    }
   }
 
   test("read Kafka transactional messages: read_committed") {

http://git-wip-us.apache.org/repos/asf/spark/blob/a71f6a17/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 73b1804..392229b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -240,9 +240,6 @@ trait ProgressReporter extends Logging {
   /** Extract number of input sources for each streaming source in plan */
   private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
 
-    import java.util.IdentityHashMap
-    import scala.collection.JavaConverters._
-
     def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
       tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
     }
@@ -255,6 +252,9 @@ trait ProgressReporter extends Logging {
     }
 
     if (onlyDataSourceV2Sources) {
+      // It's possible that multiple DataSourceV2ScanExec instances may refer to the same source
+      // (can happen with self-unions or self-joins). This means the source is scanned multiple
+      // times in the query, we should count the numRows for each scan.
       val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
         case s: DataSourceV2ScanExec if s.readSupport.isInstanceOf[BaseStreamingSource] =>
           val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/a71f6a17/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 1dd8175..c170641 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
 import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
 import org.apache.spark.sql.functions._
@@ -500,29 +501,52 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
       AssertOnQuery { q =>
         val lastProgress = getLastProgressWithData(q)
         assert(lastProgress.nonEmpty)
-        assert(lastProgress.get.numInputRows == 6)
         assert(lastProgress.get.sources.length == 1)
-        assert(lastProgress.get.sources(0).numInputRows == 6)
+        // The source is scanned twice because of self-union
+        assert(lastProgress.get.numInputRows == 6)
         true
       }
     )
   }
 
   test("input row calculation with same V2 source used twice in self-join") {
-    val streamInput = MemoryStream[Int]
-    val df = streamInput.toDF()
-    testStream(df.join(df, "value"), useV2Sink = true)(
-      AddData(streamInput, 1, 2, 3),
-      CheckAnswer(1, 2, 3),
-      AssertOnQuery { q =>
+    def checkQuery(check: AssertOnQuery): Unit = {
+      val memoryStream = MemoryStream[Int]
+      // TODO: currently the streaming framework always add a dummy Project above streaming source
+      // relation, which breaks exchange reuse, as the optimizer will remove Project from one side.
+      // Here we manually add a useful Project, to trigger exchange reuse.
+      val streamDF = memoryStream.toDF().select('value + 0 as "v")
+      testStream(streamDF.join(streamDF, "v"), useV2Sink = true)(
+        AddData(memoryStream, 1, 2, 3),
+        CheckAnswer(1, 2, 3),
+        check
+      )
+    }
+
+    withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+      checkQuery(AssertOnQuery { q =>
         val lastProgress = getLastProgressWithData(q)
         assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.sources.length == 1)
+        // The source is scanned twice because of self-join
         assert(lastProgress.get.numInputRows == 6)
+        true
+      })
+    }
+
+    withSQLConf(SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true") {
+      checkQuery(AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
         assert(lastProgress.get.sources.length == 1)
-        assert(lastProgress.get.sources(0).numInputRows == 6)
+        assert(q.lastExecution.executedPlan.collect {
+          case r: ReusedExchangeExec => r
+        }.length == 1)
+        // The source is scanned only once because of exchange reuse
+        assert(lastProgress.get.numInputRows == 3)
         true
-      }
-    )
+      })
+    }
   }
 
   test("input row calculation with trigger having data for only one of two V2 sources") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org