You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/16 19:03:10 UTC

spark git commit: [SPARK-18461][DOCS][STRUCTUREDSTREAMING] Added more information about monitoring streaming queries

Repository: spark
Updated Branches:
  refs/heads/master 0048ce7ce -> bb6cdfd9a


[SPARK-18461][DOCS][STRUCTUREDSTREAMING] Added more information about monitoring streaming queries

## What changes were proposed in this pull request?
<img width="941" alt="screen shot 2016-11-15 at 6 27 32 pm" src="https://cloud.githubusercontent.com/assets/663212/20332521/4190b858-ab61-11e6-93a6-4bdc05105ed9.png">
<img width="940" alt="screen shot 2016-11-15 at 6 27 45 pm" src="https://cloud.githubusercontent.com/assets/663212/20332525/44a0d01e-ab61-11e6-8668-47f925490d4f.png">

Author: Tathagata Das <ta...@gmail.com>

Closes #15897 from tdas/SPARK-18461.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb6cdfd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb6cdfd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb6cdfd9

Branch: refs/heads/master
Commit: bb6cdfd9a6a6b6c91aada7c3174436146045ed1e
Parents: 0048ce7
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Nov 16 11:03:10 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Nov 16 11:03:10 2016 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 182 +++++++++++++++++++-
 1 file changed, 179 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb6cdfd9/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index d254558..77b66b3 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1087,9 +1087,185 @@ spark.streams().awaitAnyTermination()  # block until any one of them terminates
 </div>
 </div>
 
-Finally, for asynchronous monitoring of streaming queries, you can create and attach a `StreamingQueryListener`
-([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs),
-which will give you regular callback-based updates when queries are started and terminated.
+
+## Monitoring Streaming Queries
+There are two ways you can monitor queries. You can directly get the current status
+of an active query using `streamingQuery.status`, which will return a `StreamingQueryStatus` object
+([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus) docs)
+that has all the details like current ingestion rates, processing rates, average latency,
+details of the currently active trigger, etc.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val query: StreamingQuery = ...
+
+println(query.status)
+
+/* Will print the current status of the query
+
+Status of query 'queryName'
+    Query id: 1
+    Status timestamp: 123
+    Input rate: 15.5 rows/sec
+    Processing rate 23.5 rows/sec
+    Latency: 345.0 ms
+    Trigger details:
+        batchId: 5
+        isDataPresentInTrigger: true
+        isTriggerActive: true
+        latency.getBatch.total: 20
+        latency.getOffset.total: 10
+        numRows.input.total: 100
+    Source statuses [1 source]:
+        Source 1 - MySource1
+            Available offset: 0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+    Sink status - MySink
+        Committed offsets: [1, -]
+*/
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+StreamingQuery query = ...
+
+System.out.println(query.status);
+
+/* Will print the current status of the query
+
+Status of query 'queryName'
+    Query id: 1
+    Status timestamp: 123
+    Input rate: 15.5 rows/sec
+    Processing rate 23.5 rows/sec
+    Latency: 345.0 ms
+    Trigger details:
+        batchId: 5
+        isDataPresentInTrigger: true
+        isTriggerActive: true
+        latency.getBatch.total: 20
+        latency.getOffset.total: 10
+        numRows.input.total: 100
+    Source statuses [1 source]:
+        Source 1 - MySource1
+            Available offset: 0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+    Sink status - MySink
+        Committed offsets: [1, -]
+*/
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+query = ...  // a StreamingQuery
+
+print(query.status)
+
+'''
+Will print the current status of the query
+
+Status of query 'queryName'
+    Query id: 1
+    Status timestamp: 123
+    Input rate: 15.5 rows/sec
+    Processing rate 23.5 rows/sec
+    Latency: 345.0 ms
+    Trigger details:
+        batchId: 5
+        isDataPresentInTrigger: true
+        isTriggerActive: true
+        latency.getBatch.total: 20
+        latency.getOffset.total: 10
+        numRows.input.total: 100
+    Source statuses [1 source]:
+        Source 1 - MySource1
+            Available offset: 0
+            Input rate: 15.5 rows/sec
+            Processing rate: 23.5 rows/sec
+            Trigger details:
+                numRows.input.source: 100
+                latency.getOffset.source: 10
+                latency.getBatch.source: 20
+    Sink status - MySink
+        Committed offsets: [1, -]
+'''
+{% endhighlight %}
+
+</div>
+</div>
+
+
+You can also asynchronously monitor all queries associated with a
+`SparkSession` by attaching a `StreamingQueryListener`
+([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs).
+Once you attach your custom `StreamingQueryListener` object with
+`sparkSession.streams.attachListener()`, you will get callbacks when a query is started and
+stopped and when there is progress made in an active query. Here is an example,
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+val spark: SparkSession = ...
+
+spark.streams.addListener(new StreamingQueryListener() {
+
+    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+        println("Query started: " + queryTerminated.queryStatus.name)
+    }
+    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
+        println("Query terminated: " + queryTerminated.queryStatus.name)
+    }
+    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+        println("Query made progress: " + queryProgress.queryStatus)
+    }
+})
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+SparkSession spark = ...
+
+spark.streams.addListener(new StreamingQueryListener() {
+
+    @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
+        System.out.println("Query started: " + queryTerminated.queryStatus.name);
+    }
+    @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
+        System.out.println("Query terminated: " + queryTerminated.queryStatus.name);
+    }
+    @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
+        System.out.println("Query made progress: " + queryProgress.queryStatus);
+    }
+});
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+{% highlight bash %}
+Not available in Python.
+{% endhighlight %}
+
+</div>
+</div>
 
 ## Recovering from Failures with Checkpointing 
 In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. As of Spark 2.0, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org