You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/02 20:42:52 UTC

spark git commit: [SPARK-18670][SS] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data

Repository: spark
Updated Branches:
  refs/heads/master a985dd8e9 -> 56a503df5


[SPARK-18670][SS] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data

## What changes were proposed in this pull request?

This PR adds a sql conf `spark.sql.streaming.noDataReportInterval` to control how long to wait before outputing the next StreamProgressEvent when there is no data.

## How was this patch tested?

The added unit test.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16108 from zsxwing/SPARK-18670.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56a503df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56a503df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56a503df

Branch: refs/heads/master
Commit: 56a503df5ccbb233ad6569e22002cc989e676337
Parents: a985dd8
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Dec 2 12:42:47 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Dec 2 12:42:47 2016 -0800

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 18 +++++++-
 .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++++
 .../streaming/StreamingQueryListenerSuite.scala | 44 ++++++++++++++++++++
 3 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/56a503df/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6d0e269..8804c64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -63,6 +63,9 @@ class StreamExecution(
 
   private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
+  private val noDataProgressEventInterval =
+    sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
+
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
    */
@@ -196,6 +199,9 @@ class StreamExecution(
       // While active, repeatedly attempt to run batches.
       SparkSession.setActiveSession(sparkSession)
 
+      // The timestamp we report an event that has no input data
+      var lastNoDataProgressEventTime = Long.MinValue
+
       triggerExecutor.execute(() => {
         startTrigger()
 
@@ -218,7 +224,17 @@ class StreamExecution(
 
             // Report trigger as finished and construct progress object.
             finishTrigger(dataAvailable)
-            postEvent(new QueryProgressEvent(lastProgress))
+            if (dataAvailable) {
+              // Reset noDataEventTimestamp if we processed any data
+              lastNoDataProgressEventTime = Long.MinValue
+              postEvent(new QueryProgressEvent(lastProgress))
+            } else {
+              val now = triggerClock.getTimeMillis()
+              if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
+                lastNoDataProgressEventTime = now
+                postEvent(new QueryProgressEvent(lastProgress))
+              }
+            }
 
             if (dataAvailable) {
               // We'll increase currentBatchId after we complete processing current batch's data

http://git-wip-us.apache.org/repos/asf/spark/blob/56a503df/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 200f060..5b45df6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -603,6 +603,13 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(10L)
 
+  val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL =
+    SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval")
+      .internal()
+      .doc("How long to wait between two progress events when there is no data")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(10000L)
+
   val STREAMING_METRICS_ENABLED =
     SQLConfigBuilder("spark.sql.streaming.metricsEnabled")
       .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.")
@@ -684,6 +691,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)
 
+  def streamingNoDataProgressEventInterval: Long =
+    getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL)
+
   def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
 
   def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)

http://git-wip-us.apache.org/repos/asf/spark/blob/56a503df/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 07a13a4..3086abf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.util.JsonProtocol
 
@@ -46,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     assert(spark.streams.active.isEmpty)
     assert(addedListeners.isEmpty)
     // Make sure we don't leak any events to the next test
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
   }
 
   testQuietly("single listener, check trigger events are generated correctly") {
@@ -191,6 +193,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }
 
+  test("only one progress event per interval when no data") {
+    // This test will start a query but not push any data, and then check if we push too many events
+    withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") {
+      @volatile var numProgressEvent = 0
+      val listener = new StreamingQueryListener {
+        override def onQueryStarted(event: QueryStartedEvent): Unit = {}
+        override def onQueryProgress(event: QueryProgressEvent): Unit = {
+          numProgressEvent += 1
+        }
+        override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
+      }
+      spark.streams.addListener(listener)
+      try {
+        val input = new MemoryStream[Int](0, sqlContext) {
+          @volatile var numTriggers = 0
+          override def getOffset: Option[Offset] = {
+            numTriggers += 1
+            super.getOffset
+          }
+        }
+        val clock = new StreamManualClock()
+        val actions = mutable.ArrayBuffer[StreamAction]()
+        actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock)
+        for (_ <- 1 to 100) {
+          actions += AdvanceManualClock(10)
+        }
+        actions += AssertOnQuery { _ =>
+          eventually(timeout(streamingTimeout)) {
+            assert(input.numTriggers > 100) // at least 100 triggers have occurred
+          }
+          true
+        }
+        testStream(input.toDS)(actions: _*)
+        spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+        // 11 is the max value of the possible numbers of events.
+        assert(numProgressEvent > 1 && numProgressEvent <= 11)
+      } finally {
+        spark.streams.removeListener(listener)
+      }
+    }
+  }
+
   testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
     // query-event-logs-version-2.0.0.txt has all types of events generated by
     // Structured Streaming in Spark 2.0.0.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org