You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/06 02:51:13 UTC

spark git commit: [SPARK-18722][SS] Move no data rate limit from StreamExecution to ProgressReporter

Repository: spark
Updated Branches:
  refs/heads/master 508de38c9 -> 4af142f55


[SPARK-18722][SS] Move no data rate limit from StreamExecution to ProgressReporter

## What changes were proposed in this pull request?

Move no data rate limit from StreamExecution to ProgressReporter to make `recentProgresses` and listener events consistent.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16155 from zsxwing/SPARK-18722.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4af142f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4af142f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4af142f5

Branch: refs/heads/master
Commit: 4af142f55771affa5fc7f2abbbf5e47766194e6e
Parents: 508de38
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Mon Dec 5 18:51:07 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 5 18:51:07 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/ProgressReporter.scala  | 33 +++++++++++++++++---
 .../execution/streaming/StreamExecution.scala   | 20 +-----------
 .../streaming/StreamingQueryListenerSuite.scala |  4 +++
 3 files changed, 33 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4af142f5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 7d0d086..d95f552 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
 import org.apache.spark.util.Clock
 
 /**
@@ -56,6 +57,7 @@ trait ProgressReporter extends Logging {
   protected def offsetSeqMetadata: OffsetSeqMetadata
   protected def currentBatchId: Long
   protected def sparkSession: SparkSession
+  protected def postEvent(event: StreamingQueryListener.Event): Unit
 
   // Local timestamps and counters.
   private var currentTriggerStartTimestamp = -1L
@@ -70,6 +72,12 @@ trait ProgressReporter extends Logging {
   /** Holds the most recent query progress updates.  Accesses must lock on the queue itself. */
   private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
 
+  private val noDataProgressEventInterval =
+    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
+  // The timestamp we report an event that has no input data
+  private var lastNoDataProgressEventTime = Long.MinValue
+
   @volatile
   protected var currentStatus: StreamingQueryStatus = {
     new StreamingQueryStatus(
@@ -100,6 +108,17 @@ trait ProgressReporter extends Logging {
     currentDurationsMs.clear()
   }
 
+  private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+    postEvent(new QueryProgressEvent(newProgress))
+    logInfo(s"Streaming query made progress: $newProgress")
+  }
+
   /** Finalizes the query progress and adds it to list of recent status updates. */
   protected def finishTrigger(hasNewData: Boolean): Unit = {
     currentTriggerEndTimestamp = triggerClock.getTimeMillis()
@@ -145,14 +164,18 @@ trait ProgressReporter extends Logging {
       sources = sourceProgress.toArray,
       sink = sinkProgress)
 
-    progressBuffer.synchronized {
-      progressBuffer += newProgress
-      while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
-        progressBuffer.dequeue()
+    if (hasNewData) {
+      // Reset noDataEventTimestamp if we processed any data
+      lastNoDataProgressEventTime = Long.MinValue
+      updateProgress(newProgress)
+    } else {
+      val now = triggerClock.getTimeMillis()
+      if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
+        lastNoDataProgressEventTime = now
+        updateProgress(newProgress)
       }
     }
 
-    logInfo(s"Streaming query made progress: $newProgress")
     currentStatus = currentStatus.copy(isTriggerActive = false)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4af142f5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 083cce8..39be222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -58,9 +58,6 @@ class StreamExecution(
 
   private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
-  private val noDataProgressEventInterval =
-    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
-
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
    */
@@ -217,9 +214,6 @@ class StreamExecution(
       // While active, repeatedly attempt to run batches.
       SparkSession.setActiveSession(sparkSession)
 
-      // The timestamp we report an event that has no input data
-      var lastNoDataProgressEventTime = Long.MinValue
-
       triggerExecutor.execute(() => {
         startTrigger()
 
@@ -243,18 +237,6 @@ class StreamExecution(
             // Report trigger as finished and construct progress object.
             finishTrigger(dataAvailable)
             if (dataAvailable) {
-              // Reset noDataEventTimestamp if we processed any data
-              lastNoDataProgressEventTime = Long.MinValue
-              postEvent(new QueryProgressEvent(lastProgress))
-            } else {
-              val now = triggerClock.getTimeMillis()
-              if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
-                lastNoDataProgressEventTime = now
-                postEvent(new QueryProgressEvent(lastProgress))
-              }
-            }
-
-            if (dataAvailable) {
               // We'll increase currentBatchId after we complete processing current batch's data
               currentBatchId += 1
             } else {
@@ -504,7 +486,7 @@ class StreamExecution(
     }
   }
 
-  private def postEvent(event: StreamingQueryListener.Event) {
+  override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
     sparkSession.streams.postListenerEvent(event)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4af142f5/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index a38c05e..1cd503c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -237,6 +237,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
           }
           true
         }
+        // `recentProgresses` should not receive too many no data events
+        actions += AssertOnQuery { q =>
+          q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
+        }
         testStream(input.toDS)(actions: _*)
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // 11 is the max value of the possible numbers of events.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org