You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/07/11 19:44:46 UTC

spark git commit: [SPARK-24697][SS] Fix the reported start offsets in streaming query progress

Repository: spark
Updated Branches:
  refs/heads/master 59c3c233f -> ff7f6ef75


[SPARK-24697][SS] Fix the reported start offsets in streaming query progress

## What changes were proposed in this pull request?

In ProgressReporter for streams, we use the `committedOffsets` as the startOffset and `availableOffsets` as the end offset when reporting the status of a trigger in `finishTrigger`. This is a bad pattern that has existed since the beginning of ProgressReporter and it is bad because its super hard to reason about when `availableOffsets` and `committedOffsets` are updated, and when they are recorded. Case in point, this bug silently existed in ContinuousExecution, since before MicroBatchExecution was refactored.

The correct fix it to record the offsets explicitly. This PR adds a simple method which is explicitly called from MicroBatch/ContinuousExecition before updating the `committedOffsets`.

## How was this patch tested?
Added new tests

Author: Tathagata Das <ta...@gmail.com>

Closes #21744 from tdas/SPARK-24697.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff7f6ef7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff7f6ef7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff7f6ef7

Branch: refs/heads/master
Commit: ff7f6ef75c80633480802d537e66432e3bea4785
Parents: 59c3c23
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Jul 11 12:44:42 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Jul 11 12:44:42 2018 -0700

----------------------------------------------------------------------
 .../streaming/MicroBatchExecution.scala         |  3 +++
 .../execution/streaming/ProgressReporter.scala  | 21 ++++++++++++++++----
 .../continuous/ContinuousExecution.scala        |  3 +++
 .../sql/streaming/StreamingQuerySuite.scala     |  6 ++++--
 4 files changed, 27 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 16651dd..45c43f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -184,6 +184,9 @@ class MicroBatchExecution(
             isCurrentBatchConstructed = constructNextBatch(noDataBatchesEnabled)
           }
 
+          // Record the trigger offset range for progress reporting *before* processing the batch
+          recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
+
           // Remember whether the current batch has data or not. This will be required later
           // for bookkeeping after running the batch, when `isNewDataAvailable` will have changed
           // to false as the batch would have already processed the available data.

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 16ad3ef..47f4b52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -56,8 +56,6 @@ trait ProgressReporter extends Logging {
   protected def logicalPlan: LogicalPlan
   protected def lastExecution: QueryExecution
   protected def newData: Map[BaseStreamingSource, LogicalPlan]
-  protected def availableOffsets: StreamProgress
-  protected def committedOffsets: StreamProgress
   protected def sources: Seq[BaseStreamingSource]
   protected def sink: BaseStreamingSink
   protected def offsetSeqMetadata: OffsetSeqMetadata
@@ -68,8 +66,11 @@ trait ProgressReporter extends Logging {
   // Local timestamps and counters.
   private var currentTriggerStartTimestamp = -1L
   private var currentTriggerEndTimestamp = -1L
+  private var currentTriggerStartOffsets: Map[BaseStreamingSource, String] = _
+  private var currentTriggerEndOffsets: Map[BaseStreamingSource, String] = _
   // TODO: Restore this from the checkpoint when possible.
   private var lastTriggerStartTimestamp = -1L
+
   private val currentDurationsMs = new mutable.HashMap[String, Long]()
 
   /** Flag that signals whether any error with input metrics have already been logged */
@@ -114,9 +115,20 @@ trait ProgressReporter extends Logging {
     lastTriggerStartTimestamp = currentTriggerStartTimestamp
     currentTriggerStartTimestamp = triggerClock.getTimeMillis()
     currentStatus = currentStatus.copy(isTriggerActive = true)
+    currentTriggerStartOffsets = null
+    currentTriggerEndOffsets = null
     currentDurationsMs.clear()
   }
 
+  /**
+   * Record the offsets range this trigger will process. Call this before updating
+   * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded.
+   */
+  protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress): Unit = {
+    currentTriggerStartOffsets = from.mapValues(_.json)
+    currentTriggerEndOffsets = to.mapValues(_.json)
+  }
+
   private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
     progressBuffer.synchronized {
       progressBuffer += newProgress
@@ -130,6 +142,7 @@ trait ProgressReporter extends Logging {
 
   /** Finalizes the query progress and adds it to list of recent status updates. */
   protected def finishTrigger(hasNewData: Boolean): Unit = {
+    assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
 
     val executionStats = extractExecutionStats(hasNewData)
@@ -147,8 +160,8 @@ trait ProgressReporter extends Logging {
       val numRecords = executionStats.inputRows.getOrElse(source, 0L)
       new SourceProgress(
         description = source.toString,
-        startOffset = committedOffsets.get(source).map(_.json).orNull,
-        endOffset = availableOffsets.get(source).map(_.json).orNull,
+        startOffset = currentTriggerStartOffsets.get(source).orNull,
+        endOffset = currentTriggerEndOffsets.get(source).orNull,
         numInputRows = numRecords,
         inputRowsPerSecond = numRecords / inputTimeSec,
         processedRowsPerSecond = numRecords / processingTimeSec

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index a0bb829..e991dbc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -309,7 +309,10 @@ class ContinuousExecution(
   def commit(epoch: Long): Unit = {
     assert(continuousSources.length == 1, "only one continuous source supported currently")
     assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
+
     synchronized {
+      // Record offsets before updating `committedOffsets`
+      recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
       if (queryExecutionThread.isAlive) {
         commitLog.add(epoch)
         val offset =

http://git-wip-us.apache.org/repos/asf/spark/blob/ff7f6ef7/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index dcf6cb5..936a076 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -335,8 +335,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
 
         assert(progress.sources.length === 1)
         assert(progress.sources(0).description contains "MemoryStream")
-        assert(progress.sources(0).startOffset === "0")
-        assert(progress.sources(0).endOffset !== null)
+        assert(progress.sources(0).startOffset === null)   // no prior offset
+        assert(progress.sources(0).endOffset === "0")
         assert(progress.sources(0).processedRowsPerSecond === 4.0)  // 2 rows processed in 500 ms
 
         assert(progress.stateOperators.length === 1)
@@ -362,6 +362,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
         assert(query.lastProgress.batchId === 1)
         assert(query.lastProgress.inputRowsPerSecond === 2.0)
         assert(query.lastProgress.sources(0).inputRowsPerSecond === 2.0)
+        assert(query.lastProgress.sources(0).startOffset === "0")
+        assert(query.lastProgress.sources(0).endOffset === "1")
         true
       },
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org