You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2020/04/09 00:00:08 UTC

[spark] branch master updated: [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run

This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2ba4f  [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run
ca2ba4f is described below

commit ca2ba4fe647cd60668410b68014a3991ad7fd5c9
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Wed Apr 8 16:59:39 2020 -0700

    [SPARK-29314][SS] Don't overwrite the metric "updated" of state operator to 0 if empty batch is run
    
    ### What changes were proposed in this pull request?
    
    This patch fixes the behavior of ProgressReporter which always overwrite the value of "updated" of state operator to 0 if there's no new data. The behavior is correct only when we copy the state progress from "previous" executed plan, meaning no batch has been run. (Nonzero value of "updated" would be odd if batch didn't run, so it was correct.)
    
    It was safe to assume no data is no batch, but SPARK-24156 enables empty data can run the batch if Spark needs to deal with watermark. After the patch, it only overwrites the value if both two conditions are met: 1) no data 2) no batch.
    
    ### Why are the changes needed?
    
    Currently Spark doesn't reflect correct metrics when empty batch is run and this patch fixes it.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Modified UT. Note that FlatMapGroupsWithState increases the value of "updated" when state rows are removed.
    Also manually tested via below query (not a simple query to test with spark-shell, as you'll meet closure issue in spark-shell while playing with state func):
    
    > query
    
    ```
    case class RunningCount(count: Long)
    
    object TestFlatMapGroupsWithState {
      def main(args: Array[String]): Unit = {
        import org.apache.spark.sql.SparkSession
    
        val ss = SparkSession
          .builder()
          .appName("TestFlatMapGroupsWithState")
          .getOrCreate()
    
        ss.conf.set("spark.sql.shuffle.partitions", "5")
    
        import ss.implicits._
    
        val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
          if (state.hasTimedOut) {
            // End users are not restricted to remove the state here - they can update the
            // state as well. For example, event time session window would have list of
            // sessions here and it cannot remove entire state.
            state.update(RunningCount(-1))
            Iterator((key, "-1"))
          } else {
            val count = state.getOption.map(_.count).getOrElse(0L) + values.size
            state.update(RunningCount(count))
            state.setTimeoutDuration("1 seconds")
            Iterator((key, count.toString))
          }
        }
    
        implicit val sqlContext = ss.sqlContext
        val inputData = MemoryStream[String]
    
        val result = inputData
          .toDF()
          .as[String]
          .groupByKey { v => v }
          .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.ProcessingTimeTimeout())(stateFunc)
    
        val query = result
          .writeStream
          .format("memory")
          .option("queryName", "test")
          .outputMode("append")
          .trigger(Trigger.ProcessingTime("5 second"))
          .start()
    
        Thread.sleep(1000)
    
        var chIdx: Long = 0
    
        while (true) {
          (chIdx to chIdx + 4).map { idx => inputData.addData(idx.toString) }
          chIdx += 5
          // intentionally sleep much more than trigger to enable "empty" batch
          Thread.sleep(10 * 1000)
        }
      }
    }
    ```
    
    > before the patch (batch 3 which was an "empty" batch)
    
    ```
    {
       "id":"de945a5c-882b-4dae-aa58-cb8261cbaf9e",
       "runId":"f1eb6d0d-3cd5-48b2-a03b-5e989b6c151b",
       "name":"test",
       "timestamp":"2019-11-18T07:00:25.005Z",
       "batchId":3,
       "numInputRows":0,
       "inputRowsPerSecond":0.0,
       "processedRowsPerSecond":0.0,
       "durationMs":{
          "addBatch":1664,
          "getBatch":0,
          "latestOffset":0,
          "queryPlanning":29,
          "triggerExecution":1789,
          "walCommit":51
       },
       "stateOperators":[
          {
             "numRowsTotal":10,
             "numRowsUpdated":0,
             "memoryUsedBytes":5130,
             "customMetrics":{
                "loadedMapCacheHitCount":15,
                "loadedMapCacheMissCount":0,
                "stateOnCurrentVersionSizeBytes":2722
             }
          }
       ],
       "sources":[
          {
             "description":"MemoryStream[value#1]",
             "startOffset":9,
             "endOffset":9,
             "numInputRows":0,
             "inputRowsPerSecond":0.0,
             "processedRowsPerSecond":0.0
          }
       ],
       "sink":{
          "description":"MemorySink",
          "numOutputRows":5
       }
    }
    ```
    
    > after the patch (batch 3 which was an "empty" batch)
    
    ```
    {
       "id":"7cb41623-6b9a-408e-ae02-6796ec636fa0",
       "runId":"17847710-ddfe-45f5-a7ab-b160e149382f",
       "name":"test",
       "timestamp":"2019-11-18T07:02:25.005Z",
       "batchId":3,
       "numInputRows":0,
       "inputRowsPerSecond":0.0,
       "processedRowsPerSecond":0.0,
       "durationMs":{
          "addBatch":1196,
          "getBatch":0,
          "latestOffset":0,
          "queryPlanning":30,
          "triggerExecution":1333,
          "walCommit":46
       },
       "stateOperators":[
          {
             "numRowsTotal":10,
             "numRowsUpdated":5,
             "memoryUsedBytes":5130,
             "customMetrics":{
                "loadedMapCacheHitCount":15,
                "loadedMapCacheMissCount":0,
                "stateOnCurrentVersionSizeBytes":2722
             }
          }
       ],
       "sources":[
          {
             "description":"MemoryStream[value#1]",
             "startOffset":9,
             "endOffset":9,
             "numInputRows":0,
             "inputRowsPerSecond":0.0,
             "processedRowsPerSecond":0.0
          }
       ],
       "sink":{
          "description":"MemorySink",
          "numOutputRows":5
       }
    }
    ```
    
    "numRowsUpdated" is `0` in "stateOperators" before applying the patch which is "wrong", as we "update" the state when timeout occurs. After applying the patch, it correctly represents the "numRowsUpdated" as `5` in "stateOperators".
    
    Closes #25987 from HeartSaVioR/SPARK-29314.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Burak Yavuz <br...@gmail.com>
---
 .../spark/sql/execution/streaming/ProgressReporter.scala     | 12 ++++++------
 .../spark/sql/streaming/FlatMapGroupsWithStateSuite.scala    |  2 +-
 2 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index d1086cd..0dff1c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -154,7 +154,7 @@ trait ProgressReporter extends Logging {
     assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
-    val executionStats = extractExecutionStats(hasNewData)
+    val executionStats = extractExecutionStats(hasNewData, hasExecuted)
     val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
     val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
 
@@ -215,26 +215,26 @@ trait ProgressReporter extends Logging {
   }
 
   /** Extract statistics about stateful operators from the executed query plan. */
-  private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
+  private def extractStateOperatorMetrics(hasExecuted: Boolean): Seq[StateOperatorProgress] = {
     if (lastExecution == null) return Nil
-    // lastExecution could belong to one of the previous triggers if `!hasNewData`.
+    // lastExecution could belong to one of the previous triggers if `!hasExecuted`.
     // Walking the plan again should be inexpensive.
     lastExecution.executedPlan.collect {
       case p if p.isInstanceOf[StateStoreWriter] =>
         val progress = p.asInstanceOf[StateStoreWriter].getProgress()
-        if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0)
+        if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0)
     }
   }
 
   /** Extracts statistics from the most recent query execution. */
-  private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
+  private def extractExecutionStats(hasNewData: Boolean, hasExecuted: Boolean): ExecutionStats = {
     val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
     val watermarkTimestamp =
       if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
       else Map.empty[String, String]
 
     // SPARK-19378: Still report metrics even though no data was processed while reporting progress.
-    val stateOperators = extractStateOperatorMetrics(hasNewData)
+    val stateOperators = extractStateOperatorMetrics(hasExecuted)
 
     if (!hasNewData) {
       return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index d36c64f..b04f8b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -798,7 +798,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest {
         }
       },
       CheckNewAnswer(("c", "-1")),
-      assertNumStateRows(total = 0, updated = 0)
+      assertNumStateRows(total = 0, updated = 1)
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org