You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/04/25 19:21:57 UTC

spark git commit: [SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources

Repository: spark
Updated Branches:
  refs/heads/master 20ca208bc -> 396938ef0


[SPARK-24050][SS] Calculate input / processing rates correctly for DataSourceV2 streaming sources

## What changes were proposed in this pull request?

In some streaming queries, the input and processing rates are not calculated at all (shows up as zero) because MicroBatchExecution fails to associated metrics from the executed plan of a trigger with the sources in the logical plan of the trigger. The way this executed-plan-leaf-to-logical-source attribution works is as follows. With V1 sources, there was no way to identify which execution plan leaves were generated by a streaming source. So did a best-effort attempt to match logical and execution plan leaves when the number of leaves were same. In cases where the number of leaves is different, we just give up and report zero rates. An example where this may happen is as follows.

```
val cachedStaticDF = someStaticDF.union(anotherStaticDF).cache()
val streamingInputDF = ...

val query = streamingInputDF.join(cachedStaticDF).writeStream....
```
In this case, the `cachedStaticDF` has multiple logical leaves, but in the trigger's execution plan it only has leaf because a cached subplan is represented as a single InMemoryTableScanExec leaf. This leads to a mismatch in the number of leaves causing the input rates to be computed as zero.

With DataSourceV2, all inputs are represented in the executed plan using `DataSourceV2ScanExec`, each of which has a reference to the associated logical `DataSource` and `DataSourceReader`. So its easy to associate the metrics to the original streaming sources.

In this PR, the solution is as follows. If all the streaming sources in a streaming query as v2 sources, then use a new code path where the execution-metrics-to-source mapping is done directly. Otherwise we fall back to existing mapping logic.

## How was this patch tested?
- New unit tests using V2 memory source
- Existing unit tests using V1 source

Author: Tathagata Das <ta...@gmail.com>

Closes #21126 from tdas/SPARK-24050.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/396938ef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/396938ef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/396938ef

Branch: refs/heads/master
Commit: 396938ef02c70468e1695872f96b1e9aff28b7ea
Parents: 20ca208
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Apr 25 12:21:55 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Apr 25 12:21:55 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |   9 +-
 .../execution/streaming/ProgressReporter.scala  | 146 ++++++++++++++-----
 .../sql/streaming/StreamingQuerySuite.scala     | 134 ++++++++++++++++-
 3 files changed, 245 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/396938ef/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index e017fd9..d2d04b6 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -563,7 +563,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
     )
   }
 
-  test("ensure stream-stream self-join generates only one offset in offset log") {
+  test("ensure stream-stream self-join generates only one offset in log and correct metrics") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 2)
     require(testUtils.getLatestOffsets(Set(topic)).size === 2)
@@ -587,7 +587,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
       AddKafkaData(Set(topic), 1, 2),
       CheckAnswer((1, 1, 1), (2, 2, 2)),
       AddKafkaData(Set(topic), 6, 3),
-      CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6))
+      CheckAnswer((1, 1, 1), (2, 2, 2), (3, 3, 3), (1, 6, 1), (1, 1, 6), (1, 6, 6)),
+      AssertOnQuery { q =>
+        assert(q.availableOffsets.iterator.size == 1)
+        assert(q.recentProgress.map(_.numInputRows).sum == 4)
+        true
+      }
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/396938ef/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d1e5be9..16ad3ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -28,6 +28,8 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
 import org.apache.spark.util.Clock
@@ -141,7 +143,7 @@ trait ProgressReporter extends Logging {
     }
     logDebug(s"Execution stats: $executionStats")
 
-    val sourceProgress = sources.map { source =>
+    val sourceProgress = sources.distinct.map { source =>
       val numRecords = executionStats.inputRows.getOrElse(source, 0L)
       new SourceProgress(
         description = source.toString,
@@ -207,62 +209,126 @@ trait ProgressReporter extends Logging {
       return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
     }
 
-    // We want to associate execution plan leaves to sources that generate them, so that we match
-    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
-    // Consider the translation from the streaming logical plan to the final executed plan.
-    //
-    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
-    //
-    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
-    //    - Each logical plan leaf will be associated with a single streaming source.
-    //    - There can be multiple logical plan leaves associated with a streaming source.
-    //    - There can be leaves not associated with any streaming source, because they were
-    //      generated from a batch source (e.g. stream-batch joins)
-    //
-    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
-    //    the trigger logical plan, we associate executed plan leaves with corresponding
-    //    streaming sources.
-    //
-    // 3. For each source, we sum the metrics of the associated execution plan leaves.
-    //
-    val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
-      logicalPlan.collectLeaves().map { leaf => leaf -> source }
+    val numInputRows = extractSourceToNumInputRows()
+
+    val eventTimeStats = lastExecution.executedPlan.collect {
+      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
+        val stats = e.eventTimeStats.value
+        Map(
+          "max" -> stats.max,
+          "min" -> stats.min,
+          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
+
+    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+  }
+
+  /** Extract number of input sources for each streaming source in plan */
+  private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
+
+    import java.util.IdentityHashMap
+    import scala.collection.JavaConverters._
+
+    def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
+      tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
     }
-    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
-    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
-    val numInputRows: Map[BaseStreamingSource, Long] =
+
+    val onlyDataSourceV2Sources = {
+      // Check whether the streaming query's logical plan has only V2 data sources
+      val allStreamingLeaves =
+        logicalPlan.collect { case s: StreamingExecutionRelation => s }
+      allStreamingLeaves.forall { _.source.isInstanceOf[MicroBatchReader] }
+    }
+
+    if (onlyDataSourceV2Sources) {
+      // DataSourceV2ScanExec is the execution plan leaf that is responsible for reading data
+      // from a V2 source and has a direct reference to the V2 source that generated it. Each
+      // DataSourceV2ScanExec records the number of rows it has read using SQLMetrics. However,
+      // just collecting all DataSourceV2ScanExec nodes and getting the metric is not correct as
+      // a DataSourceV2ScanExec instance may be referred to in the execution plan from two (or
+      // even multiple times) points and considering it twice will leads to double counting. We
+      // can't dedup them using their hashcode either because two different instances of
+      // DataSourceV2ScanExec can have the same hashcode but account for separate sets of
+      // records read, and deduping them to consider only one of them would be undercounting the
+      // records read. Therefore the right way to do this is to consider the unique instances of
+      // DataSourceV2ScanExec (using their identity hash codes) and get metrics from them.
+      // Hence we calculate in the following way.
+      //
+      // 1. Collect all the unique DataSourceV2ScanExec instances using IdentityHashMap.
+      //
+      // 2. Extract the source and the number of rows read from the DataSourceV2ScanExec instanes.
+      //
+      // 3. Multiple DataSourceV2ScanExec instance may refer to the same source (can happen with
+      //    self-unions or self-joins). Add up the number of rows for each unique source.
+      val uniqueStreamingExecLeavesMap =
+        new IdentityHashMap[DataSourceV2ScanExec, DataSourceV2ScanExec]()
+
+      lastExecution.executedPlan.collectLeaves().foreach {
+        case s: DataSourceV2ScanExec if s.reader.isInstanceOf[BaseStreamingSource] =>
+          uniqueStreamingExecLeavesMap.put(s, s)
+        case _ =>
+      }
+
+      val sourceToInputRowsTuples =
+        uniqueStreamingExecLeavesMap.values.asScala.map { execLeaf =>
+          val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+          val source = execLeaf.reader.asInstanceOf[BaseStreamingSource]
+          source -> numRows
+        }.toSeq
+      logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))
+      sumRows(sourceToInputRowsTuples)
+    } else {
+
+      // Since V1 source do not generate execution plan leaves that directly link with source that
+      // generated it, we can only do a best-effort association between execution plan leaves to the
+      // sources. This is known to fail in a few cases, see SPARK-24050.
+      //
+      // We want to associate execution plan leaves to sources that generate them, so that we match
+      // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
+      // Consider the translation from the streaming logical plan to the final executed plan.
+      //
+      // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
+      //
+      // 1. We keep track of streaming sources associated with each leaf in trigger's logical plan
+      //  - Each logical plan leaf will be associated with a single streaming source.
+      //  - There can be multiple logical plan leaves associated with a streaming source.
+      //  - There can be leaves not associated with any streaming source, because they were
+      //      generated from a batch source (e.g. stream-batch joins)
+      //
+      // 2. Assuming that the executed plan has same number of leaves in the same order as that of
+      //    the trigger logical plan, we associate executed plan leaves with corresponding
+      //    streaming sources.
+      //
+      // 3. For each source, we sum the metrics of the associated execution plan leaves.
+      //
+      val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) =>
+        logicalPlan.collectLeaves().map { leaf => leaf -> source }
+      }
+      val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
+      val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
       if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
         val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
           case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
         }
-        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
+        val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) =>
           val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
           source -> numRows
         }
-        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
+        sumRows(sourceToInputRowsTuples)
       } else {
         if (!metricWarningLogged) {
           def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
+
           logWarning(
             "Could not report metrics as number leaves in trigger logical plan did not match that" +
-                s" of the execution plan:\n" +
-                s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
-                s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+              s" of the execution plan:\n" +
+              s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
+              s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
           metricWarningLogged = true
         }
         Map.empty
       }
-
-    val eventTimeStats = lastExecution.executedPlan.collect {
-      case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
-        val stats = e.eventTimeStats.value
-        Map(
-          "max" -> stats.max,
-          "min" -> stats.min,
-          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
-    }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
-
-    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
+    }
   }
 
   /** Records the duration of running `body` for the next query progress update. */

http://git-wip-us.apache.org/repos/asf/spark/blob/396938ef/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 20942ed..390d67d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -466,7 +466,17 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
-  test("input row calculation with mixed batch and streaming sources") {
+  test("input row calculation with same V1 source used twice in self-join") {
+    val streamingTriggerDF = spark.createDataset(1 to 10).toDF
+    val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
+
+    val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value"))
+    assert(progress.numInputRows === 20) // data is read multiple times in self-joins
+    assert(progress.sources.size === 1)
+    assert(progress.sources(0).numInputRows === 20)
+  }
+
+  test("input row calculation with mixed batch and streaming V1 sources") {
     val streamingTriggerDF = spark.createDataset(1 to 10).toDF
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
     val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
@@ -479,7 +489,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     assert(progress.sources(0).numInputRows === 10)
   }
 
-  test("input row calculation with trigger input DF having multiple leaves") {
+  test("input row calculation with trigger input DF having multiple leaves in V1 source") {
     val streamingTriggerDF =
       spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF)
     require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1)
@@ -492,6 +502,121 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     assert(progress.sources(0).numInputRows === 10)
   }
 
+  test("input row calculation with same V2 source used twice in self-union") {
+    val streamInput = MemoryStream[Int]
+
+    testStream(streamInput.toDF().union(streamInput.toDF()), useV2Sink = true)(
+      AddData(streamInput, 1, 2, 3),
+      CheckAnswer(1, 1, 2, 2, 3, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 6)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 6)
+        true
+      }
+    )
+  }
+
+  test("input row calculation with same V2 source used twice in self-join") {
+    val streamInput = MemoryStream[Int]
+    val df = streamInput.toDF()
+    testStream(df.join(df, "value"), useV2Sink = true)(
+      AddData(streamInput, 1, 2, 3),
+      CheckAnswer(1, 2, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 6)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 6)
+        true
+      }
+    )
+  }
+
+  test("input row calculation with trigger having data for only one of two V2 sources") {
+    val streamInput1 = MemoryStream[Int]
+    val streamInput2 = MemoryStream[Int]
+
+    testStream(streamInput1.toDF().union(streamInput2.toDF()), useV2Sink = true)(
+      AddData(streamInput1, 1, 2, 3),
+      CheckLastBatch(1, 2, 3),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 3)
+        assert(lastProgress.get.sources.length == 2)
+        assert(lastProgress.get.sources(0).numInputRows == 3)
+        assert(lastProgress.get.sources(1).numInputRows == 0)
+        true
+      },
+      AddData(streamInput2, 4, 5),
+      CheckLastBatch(4, 5),
+      AssertOnQuery { q =>
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 2)
+        assert(lastProgress.get.sources.length == 2)
+        assert(lastProgress.get.sources(0).numInputRows == 0)
+        assert(lastProgress.get.sources(1).numInputRows == 2)
+        true
+      }
+    )
+  }
+
+  test("input row calculation with mixed batch and streaming V2 sources") {
+
+    val streamInput = MemoryStream[Int]
+    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue")
+
+    testStream(streamInput.toDF().join(staticInputDF, "value"), useV2Sink = true)(
+      AddData(streamInput, 1, 2, 3),
+      AssertOnQuery { q =>
+        q.processAllAvailable()
+
+        // The number of leaves in the trigger's logical plan should be same as the executed plan.
+        require(
+          q.lastExecution.logical.collectLeaves().length ==
+            q.lastExecution.executedPlan.collectLeaves().length)
+
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 3)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 3)
+        true
+      }
+    )
+
+    val streamInput2 = MemoryStream[Int]
+    val staticInputDF2 = staticInputDF.union(staticInputDF).cache()
+
+    testStream(streamInput2.toDF().join(staticInputDF2, "value"), useV2Sink = true)(
+      AddData(streamInput2, 1, 2, 3),
+      AssertOnQuery { q =>
+        q.processAllAvailable()
+        // The number of leaves in the trigger's logical plan should be different from
+        // the executed plan. The static input will have two leaves in the logical plan
+        // (due to the union), but will be converted to a single leaf in the executed plan
+        // (due to the caching, the cached subplan is replaced by a single InMemoryTableScanExec).
+        require(
+          q.lastExecution.logical.collectLeaves().length !=
+            q.lastExecution.executedPlan.collectLeaves().length)
+
+        // Despite the mismatch in total number of leaves in the logical and executed plans,
+        // we should be able to attribute streaming input metrics to the streaming sources.
+        val lastProgress = getLastProgressWithData(q)
+        assert(lastProgress.nonEmpty)
+        assert(lastProgress.get.numInputRows == 3)
+        assert(lastProgress.get.sources.length == 1)
+        assert(lastProgress.get.sources(0).numInputRows == 3)
+        true
+      }
+    )
+  }
+
   testQuietly("StreamExecution metadata garbage collection") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)
@@ -733,6 +858,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
+  /** Returns the last query progress from query.recentProgress where numInputRows is positive */
+  def getLastProgressWithData(q: StreamingQuery): Option[StreamingQueryProgress] = {
+    q.recentProgress.filter(_.numInputRows > 0).lastOption
+  }
+
   /**
    * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org