You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/10/19 00:31:25 UTC

spark git commit: [SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs for branch-2.0

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f6b87939c -> 99943bf69


[SPARK-17731][SQL][STREAMING][FOLLOWUP] Refactored StreamingQueryListener APIs for branch-2.0

This is the branch-2.0 PR of #15530 to make the APIs consistent with the master. Since these APIs are experimental and not direct user facing (StreamingQueryListener is advanced Structured Streaming APIs), its okay to change them in branch-2.0.

## What changes were proposed in this pull request?

As per rxin request, here are further API changes
- Changed `Stream(Started/Progress/Terminated)` events to `Stream*Event`
- Changed the fields in `StreamingQueryListener.on***` from `query*` to `event`

## How was this patch tested?
Existing unit tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #15535 from tdas/SPARK-17731-1-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99943bf6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99943bf6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99943bf6

Branch: refs/heads/branch-2.0
Commit: 99943bf6905ca82a2c3e16e5d807fb572fa3dd3b
Parents: f6b8793
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Oct 18 17:31:21 2016 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Oct 18 17:31:21 2016 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                           | 12 +++++++++++-
 .../sql/execution/streaming/StreamExecution.scala    | 15 ++++++++-------
 .../streaming/StreamingQueryListenerBus.scala        |  8 ++++----
 .../spark/sql/streaming/StreamingQueryListener.scala | 14 +++++++-------
 .../org/apache/spark/sql/streaming/StreamTest.scala  |  6 +++---
 .../sql/streaming/StreamingQueryListenerSuite.scala  | 13 +++++++------
 .../spark/sql/streaming/StreamingQuerySuite.scala    |  6 +++---
 7 files changed, 43 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ddf53bb..ee6e31a0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -788,6 +788,7 @@ object MimaExcludes {
       // SPARK-16240: ML persistence backward compatibility for LDA
       ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.clustering.LDA$")
     ) ++ Seq(
+      // [SPARK-17731][SQL][Streaming] Metrics for structured streaming
       ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryInfo"),
       ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.this"),
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStarted.queryInfo"),
@@ -796,7 +797,16 @@ object MimaExcludes {
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminated.queryInfo"),
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.SourceStatus.offsetDesc"),
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.SourceStatus.this"),
-      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status")
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.status"),
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryStarted"),
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgress"),
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminated"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryStarted"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryProgress"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6330e0a..627b87b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -165,7 +165,7 @@ class StreamExecution(
     new Path(new Path(checkpointRoot), name).toUri.toString
 
   /**
-   * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event
+   * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]]
    * has been posted to all the listeners.
    */
   def start(): Unit = {
@@ -177,9 +177,10 @@ class StreamExecution(
   /**
    * Repeatedly attempts to run batches as data arrives.
    *
-   * Note that this method ensures that [[QueryStarted]] and [[QueryTerminated]] events are posted
-   * such that listeners are guaranteed to get a start event before a termination. Furthermore, this
-   * method also ensures that [[QueryStarted]] event is posted before the `start()` method returns.
+   * Note that this method ensures that [[QueryStartedEvent]] and [[QueryTerminatedEvent]] are
+   * posted such that listeners are guaranteed to get a start event before a termination.
+   * Furthermore, this method also ensures that [[QueryStartedEvent]] event is posted before the
+   * `start()` method returns.
    */
   private def runBatches(): Unit = {
     try {
@@ -190,7 +191,7 @@ class StreamExecution(
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
       updateStatus()
-      postEvent(new QueryStarted(currentStatus)) // Assumption: Does not throw exception.
+      postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -232,7 +233,7 @@ class StreamExecution(
         // Update metrics and notify others
         streamMetrics.reportTriggerFinished()
         updateStatus()
-        postEvent(new QueryProgress(currentStatus))
+        postEvent(new QueryProgressEvent(currentStatus))
         isTerminated
       })
     } catch {
@@ -260,7 +261,7 @@ class StreamExecution(
       // Notify others
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
       postEvent(
-        new QueryTerminated(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
+        new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index 1e66395..fc2190d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
    */
   def post(event: StreamingQueryListener.Event) {
     event match {
-      case s: QueryStarted =>
+      case s: QueryStartedEvent =>
         postToAll(s)
       case _ =>
         sparkListenerBus.post(event)
@@ -59,11 +59,11 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus)
       listener: StreamingQueryListener,
       event: StreamingQueryListener.Event): Unit = {
     event match {
-      case queryStarted: QueryStarted =>
+      case queryStarted: QueryStartedEvent =>
         listener.onQueryStarted(queryStarted)
-      case queryProgress: QueryProgress =>
+      case queryProgress: QueryProgressEvent =>
         listener.onQueryProgress(queryProgress)
-      case queryTerminated: QueryTerminated =>
+      case queryTerminated: QueryTerminatedEvent =>
         listener.onQueryTerminated(queryTerminated)
       case _ =>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 69790e3..9e311fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -41,7 +41,7 @@ abstract class StreamingQueryListener {
    *       don't block this method as it will block your query.
    * @since 2.0.0
    */
-  def onQueryStarted(queryStarted: QueryStarted): Unit
+  def onQueryStarted(event: QueryStartedEvent): Unit
 
   /**
    * Called when there is some status update (ingestion rate updated, etc.)
@@ -49,16 +49,16 @@ abstract class StreamingQueryListener {
    * @note This method is asynchronous. The status in [[StreamingQuery]] will always be
    *       latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
    *       may be changed before/when you process the event. E.g., you may find [[StreamingQuery]]
-   *       is terminated when you are processing [[QueryProgress]].
+   *       is terminated when you are processing [[QueryProgressEvent]].
    * @since 2.0.0
    */
-  def onQueryProgress(queryProgress: QueryProgress): Unit
+  def onQueryProgress(event: QueryProgressEvent): Unit
 
   /**
    * Called when a query is stopped, with or without error.
    * @since 2.0.0
    */
-  def onQueryTerminated(queryTerminated: QueryTerminated): Unit
+  def onQueryTerminated(event: QueryTerminatedEvent): Unit
 }
 
 
@@ -84,7 +84,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event
+  class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
 
   /**
    * :: Experimental ::
@@ -92,7 +92,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryProgress private[sql](val queryStatus: StreamingQueryStatus) extends Event
+  class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
 
   /**
    * :: Experimental ::
@@ -104,7 +104,7 @@ object StreamingQueryListener {
    * @since 2.0.0
    */
   @Experimental
-  class QueryTerminated private[sql](
+  class QueryTerminatedEvent private[sql](
       val queryStatus: StreamingQueryStatus,
       val exception: Option[String]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 5aa363e..ec49e1a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -676,20 +676,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     }
 
 
-    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
+    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
       asyncTestWaiter {
         startStatus = queryStarted.queryStatus
       }
     }
 
-    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
       asyncTestWaiter {
         assert(startStatus != null, "onQueryProgress called before onQueryStarted")
         synchronized { progressStatuses += queryProgress.queryStatus }
       }
     }
 
-    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
+    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
       asyncTestWaiter {
         assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
         terminationStatus = queryTerminated.queryStatus

http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 623f66a..ff84386 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -177,30 +177,31 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   }
 
   test("QueryStarted serialization") {
-    val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus)
+    val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
     val json = JsonProtocol.sparkEventToJson(queryStarted)
     val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryStarted]
+      .asInstanceOf[StreamingQueryListener.QueryStartedEvent]
     assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
   }
 
   test("QueryProgress serialization") {
-    val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus)
+    val queryProcess = new StreamingQueryListener.QueryProgressEvent(
+      StreamingQueryStatus.testStatus)
     val json = JsonProtocol.sparkEventToJson(queryProcess)
     val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryProgress]
+      .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
     assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
   }
 
   test("QueryTerminated serialization") {
     val exception = new RuntimeException("exception")
-    val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
+    val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
       StreamingQueryStatus.testStatus,
       Some(exception.getMessage))
     val json =
       JsonProtocol.sparkEventToJson(queryQueryTerminated)
     val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[StreamingQueryListener.QueryTerminated]
+      .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
     assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/99943bf6/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9f8e2db..92020be 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,11 +290,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     // A StreamingQueryListener that gets the query status after the first completed trigger
     val listener = new StreamingQueryListener {
       @volatile var firstStatus: StreamingQueryStatus = null
-      override def onQueryStarted(queryStarted: QueryStarted): Unit = { }
-      override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        if (firstStatus == null) firstStatus = queryProgress.queryStatus
       }
-      override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { }
+      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
     }
 
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org