You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/12/28 20:11:29 UTC

spark git commit: [SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status

Repository: spark
Updated Branches:
  refs/heads/master 6a475ae46 -> 092c6725b


[SPARK-18669][SS][DOCS] Update Apache docs for Structured Streaming regarding watermarking and status

## What changes were proposed in this pull request?

- Extended the Window operation section with code snippet and explanation of watermarking
- Extended the Output Mode section with a table showing the compatibility between query type and output mode
- Rewrote the Monitoring section with updated jsons generated by StreamingQuery.progress/status
- Updated API changes in the StreamingQueryListener example

TODO
- [x] Figure showing the watermarking

## How was this patch tested?

N/A

## Screenshots
### Section: Windowed Aggregation with Event Time

<img width="927" alt="screen shot 2016-12-15 at 3 33 10 pm" src="https://cloud.githubusercontent.com/assets/663212/21246197/0e02cb1a-c2dc-11e6-8816-0cd28d8201d7.png">

![image](https://cloud.githubusercontent.com/assets/663212/21246241/45b0f87a-c2dc-11e6-9c29-d0a89e07bf8d.png)

<img width="929" alt="screen shot 2016-12-15 at 3 33 46 pm" src="https://cloud.githubusercontent.com/assets/663212/21246202/1652cefa-c2dc-11e6-8c64-3c05977fb3fc.png">

----------------------------
### Section: Output Modes
![image](https://cloud.githubusercontent.com/assets/663212/21246276/8ee44948-c2dc-11e6-9fa2-30502fcf9a55.png)

----------------------------
### Section: Monitoring
![image](https://cloud.githubusercontent.com/assets/663212/21246535/3c5baeb2-c2de-11e6-88cd-ca71db7c5cf9.png)
![image](https://cloud.githubusercontent.com/assets/663212/21246574/789492c2-c2de-11e6-8471-7bef884e1837.png)

Author: Tathagata Das <ta...@gmail.com>

Closes #16294 from tdas/SPARK-18669.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/092c6725
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/092c6725
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/092c6725

Branch: refs/heads/master
Commit: 092c6725bf039bf33299b53791e1958c4ea3f6aa
Parents: 6a475ae
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Dec 28 12:11:25 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 28 12:11:25 2016 -0800

----------------------------------------------------------------------
 docs/img/structured-streaming-watermark.png    | Bin 0 -> 252000 bytes
 docs/img/structured-streaming.pptx             | Bin 1105413 -> 1113902 bytes
 docs/structured-streaming-programming-guide.md | 460 +++++++++++++++-----
 3 files changed, 353 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/092c6725/docs/img/structured-streaming-watermark.png
----------------------------------------------------------------------
diff --git a/docs/img/structured-streaming-watermark.png b/docs/img/structured-streaming-watermark.png
new file mode 100644
index 0000000..f21fbda
Binary files /dev/null and b/docs/img/structured-streaming-watermark.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/092c6725/docs/img/structured-streaming.pptx
----------------------------------------------------------------------
diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx
index 6aad2ed..f5bdfc0 100644
Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ

http://git-wip-us.apache.org/repos/asf/spark/blob/092c6725/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 77b66b3..3b7d0c4 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide
 # Overview
 Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data.The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java or Python to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
 
-**Spark 2.0 is the ALPHA RELEASE of Structured Streaming** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. 
+**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count. 
 
 # Quick Example
 Let\u2019s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let\u2019s see how you can express this using Structured Streaming. You can see the full code in 
@@ -400,7 +400,14 @@ see how this model handles event-time based processing and late arriving data.
 ## Handling Event-time and Late Data
 Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the even-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
 
-Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating/cleaning up the aggregates when there is late data. While not yet implemented in Spark 2.0, event-time watermarking will be used to manage this data. These are explained later in more details in the [Window Operations](#window-operations-on-event-time) section.
+Furthermore, this model naturally handles data that has arrived later than 
+expected based on its event-time. Since Spark is updating the Result Table, 
+it has full control over updating old aggregates when there is late data, 
+as well as cleaning up old aggregates to limit the size of intermediate
+state data. Since Spark 2.1, we have support for watermarking which 
+allows the user to specify the threshold of late data, and allows the engine
+to accordingly clean up old state. These are explained later in more 
+details in the [Window Operations](#window-operations-on-event-time) section.
 
 ## Fault Tolerance Semantics
 Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers)
@@ -671,12 +678,123 @@ windowedCounts = words.groupBy(
 </div>
 
 
+### Handling Late Data and Watermarking
 Now consider what happens if one of the events arrives late to the application.
-For example, a word that was generated at 12:04 but it was received at 12:11. 
-Since this windowing is based on the time in the data, the time 12:04 should be considered for windowing. This occurs naturally in our window-based grouping \u2013 the late data is automatically placed in the proper windows and the correct aggregates are updated as illustrated below.
+For example, say, a word generated at 12:04 (i.e. event time) could be received received by 
+the application at 12:11. The application should use the time 12:04 instead of 12:11
+to update the older counts for the window `12:00 - 12:10`. This occurs 
+naturally in our window-based grouping \u2013 Structured Streaming can maintain the intermediate state 
+for partial aggregates for a long period of time such that late data can update aggregates of 
+old windows correctly, as illustrated below.
 
 ![Handling Late Data](img/structured-streaming-late-data.png)
 
+However, to run this query for days, its necessary for the system to bound the amount of 
+intermediate in-memory state it accumulates. This means the system needs to know when an old 
+aggregate can be dropped from the in-memory state because the application is not going to receive 
+late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced 
+**watermarking**, which let's the engine automatically track the current event time in the data and
+and attempt to clean up old state accordingly. You can define the watermark of a query by 
+specifying the event time column and the threshold on how late the data is expected be in terms of 
+event time. For a specific window starting at time `T`, the engine will maintain state and allow late
+data to be update the state until `(max event time seen by the engine - late threshold > T)`. 
+In other words, late data within the threshold will be aggregated, 
+but data later than the threshold will be dropped. Let's understand this with an example. We can 
+easily define watermarking on the previous example using `withWatermark()` as shown below.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import spark.implicits._
+
+val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
+
+// Group the data by window and word and compute the count of each group
+val windowedCounts = words
+    .withWatermark("timestamp", "10 minutes")
+    .groupBy(
+        window($"timestamp", "10 minutes", "5 minutes"),
+        $"word")
+    .count()
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
+
+// Group the data by window and word and compute the count of each group
+Dataset<Row> windowedCounts = words
+    .withWatermark("timestamp", "10 minutes")
+    .groupBy(
+        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
+        words.col("word"))
+    .count();
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+{% highlight python %}
+words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
+
+# Group the data by window and word and compute the count of each group
+windowedCounts = words
+    .withWatermark("timestamp", "10 minutes")
+    .groupBy(
+        window(words.timestamp, "10 minutes", "5 minutes"),
+        words.word)
+    .count()
+{% endhighlight %}
+
+</div>
+</div>
+
+In this example, we are defining the watermark of the query on the value of the column "timestamp", 
+and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query 
+is run in Append output mode (discussed later in [Output Modes](#output-modes) section), 
+the engine will track the current event time from the column "timestamp" and wait for additional
+"10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table.
+Here is an illustration. 
+
+![Watermarking in Append Mode](img/structured-streaming-watermark.png)
+
+As shown in the illustration, the maximum event time tracked by the engine is the 
+*blue dashed line*, and the watermark set as `(max event time - '10 mins')`
+at the beginning of every trigger is the red line  For example, when the engine observes the data 
+`(12:14, dog)`, it sets the watermark for the next trigger as `12:04`.
+For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system
+is waiting for late data. After the system finds data (i.e. `(12:21, owl)`) such that the 
+watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will
+not change any further as all "too-late" data older than 12:10 will be ignored.  
+
+Note that in Append output mode, the system has to wait for "late threshold" time 
+before it can output the aggregation of a window. This may not be ideal if data can be very late, 
+(say 1 day) and you like to have partial counts without waiting for a day. In future, we will add
+Update output mode which would allows every update to aggregates to be written to sink every trigger. 
+
+**Conditions for watermarking to clean aggregation state**
+It is important to note that the following conditions must be satisfied for the watermarking to 
+clean the state in aggregation queries *(as of Spark 2.1, subject to change in the future)*.
+
+- **Output mode must be Append.** Complete mode requires all aggregate data to be preserved, and hence 
+cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section 
+for detailed explanation of the semantics of each output mode.
+
+- The aggregation must have either the event-time column, or a `window` on the event-time column. 
+
+- `withWatermark` must be called on the 
+same column as the timestamp column used in the aggregate. For example, 
+`df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid 
+in Append output mode, as watermark is defined on a different column
+as the aggregation column.
+
+- `withWatermark` must be called before the aggregation for the watermark details to be used. 
+For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append 
+output mode.
+
+
 ### Join Operations
 Streaming DataFrames can be joined with static DataFrames to create new streaming DataFrames. Here are a few examples.
 
@@ -763,16 +881,78 @@ returned through `Dataset.writeStream()`. You will have to specify one or more o
 - *Checkpoint location:* For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
 
 #### Output Modes
-There are two types of output mode currently implemented.
+There are a few types of output modes.
+
+- **Append mode (default)** - This is the default mode, where only the 
+new rows added to the Result Table since the last trigger will be 
+outputted to the sink. This is supported for only those queries where 
+rows added to the Result Table is never going to change. Hence, this mode 
+guarantees that each row will be output only once (assuming 
+fault-tolerant sink). For example, queries with only `select`, 
+`where`, `map`, `flatMap`, `filter`, `join`, etc. will support Append mode.
 
-- **Append mode (default)** - This is the default mode, where only the new rows added to the result table since the last trigger will be outputted to the sink. This is only applicable to queries that *do not have any aggregations* (e.g. queries with only `select`, `where`, `map`, `flatMap`, `filter`, `join`, etc.).
+- **Complete mode** - The whole Result Table will be outputted to the sink after every trigger.
+ This is supported for aggregation queries.
 
-- **Complete mode** - The whole result table will be outputted to the sink.This is only applicable to queries that *have aggregations*. 
+- **Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table that were 
+updated since the last trigger will be outputted to the sink. 
+More information to be added in future releases.
+
+Different types of streaming queries support different output modes. 
+Here is the compatibility matrix.
+
+<table class="table">
+  <tr>
+    <th>Query Type</th>
+    <th></th>
+    <th>Supported Output Modes</th>
+    <th>Notes</th>        
+  </tr>
+  <tr>
+    <td colspan="2" valign="middle"><br/>Queries without aggregation</td>
+    <td>Append</td>
+    <td>
+        Complete mode note supported as it is infeasible to keep all data in the Result Table.
+    </td>
+  </tr>
+  <tr>
+    <td rowspan="2">Queries with aggregation</td>
+    <td>Aggregation on event-time with watermark</td>
+    <td>Append, Complete</td>
+    <td>
+        Append mode uses watermark to drop old aggregation state. But the output of a 
+        windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
+        the modes semantics, rows can be added to the Result Table only once after they are 
+        finalized (i.e. after watermark is crossed). See 
+        <a href="#handling-late-data">Late Data</a> section for more details.
+        <br/><br/>
+        Complete mode does drop not old aggregation state since by definition this mode
+        preserves all data in the Result Table.
+    </td>    
+  </tr>
+  <tr>
+    <td>Other aggregations</td>
+    <td>Complete</td>
+    <td>
+        Append mode is not supported as aggregates can update thus violating the semantics of 
+        this mode.
+        <br/><br/>
+        Complete mode does drop not old aggregation state since by definition this mode
+        preserves all data in the Result Table.
+    </td>  
+  </tr>
+  <tr>
+    <td></td>
+    <td></td>
+    <td></td>
+    <td></td>
+  </tr>
+</table>
 
 #### Output Sinks
 There are a few types of built-in output sinks.
 
-- **File sink** - Stores the output to a directory. As of Spark 2.0, this only supports Parquet file format, and Append output mode. 
+- **File sink** - Stores the output to a directory. 
 
 - **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details.
 
@@ -791,7 +971,7 @@ Here is a table of all the sinks, and the corresponding settings.
     <th>Notes</th>
   </tr>
   <tr>
-    <td><b>File Sink</b><br/>(only parquet in Spark 2.0)</td>
+    <td><b>File Sink</b></td>
     <td>Append</td>
     <td><pre>writeStream<br/>  .format("parquet")<br/>  .start()</pre></td>
     <td>Yes</td>
@@ -817,7 +997,14 @@ Here is a table of all the sinks, and the corresponding settings.
     <td><pre>writeStream<br/>  .format("memory")<br/>  .queryName("table")<br/>  .start()</pre></td>
     <td>No</td>
     <td>Saves the output data as a table, for interactive querying. Table name is the query name.</td>
-  </tr> 
+  </tr>
+  <tr>
+    <td></td>
+    <td></td>
+    <td></td>
+    <td></td>
+    <td></td>
+  </tr>
 </table>
 
 Finally, you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let\u2019s understand all this with a few examples.
@@ -947,7 +1134,7 @@ spark.sql("select * from aggregates").show()   # interactively query in-memory t
 </div>
 
 #### Using Foreach
-The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.0, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
+The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
 ([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
 which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
 
@@ -1089,11 +1276,28 @@ spark.streams().awaitAnyTermination()  # block until any one of them terminates
 
 
 ## Monitoring Streaming Queries
-There are two ways you can monitor queries. You can directly get the current status
-of an active query using `streamingQuery.status`, which will return a `StreamingQueryStatus` object
-([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus) docs)
-that has all the details like current ingestion rates, processing rates, average latency,
-details of the currently active trigger, etc.
+There are two APIs for monitoring and debugging active queries - 
+interactively and asynchronously.
+
+### Interactive APIs
+
+You can directly get the current status and metrics of an active query using 
+`streamingQuery.lastProgress()` and `streamingQuery.status()`. 
+`lastProgress()` returns a `StreamingQueryProgress` object 
+in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryProgress) 
+and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryProgress.html)
+and an dictionary with the same fields in Python. It has all the information about
+the progress made in the last trigger of the stream - what data was processed, 
+what were the processing rates, latencies, etc. There is also 
+`streamingQuery.recentProgress` which returns an array of last few progresses.  
+
+In addition, `streamingQuery.status()` returns `StreamingQueryStatus` object 
+in [Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus) 
+and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)
+and an dictionary with the same fields in Python. It gives information about
+what the query is immediately doing - is a trigger active, is data being processed, etc.
+
+Here are a few examples.
 
 <div class="codetabs">
 <div data-lang="scala"  markdown="1">
@@ -1101,34 +1305,65 @@ details of the currently active trigger, etc.
 {% highlight scala %}
 val query: StreamingQuery = ...
 
+println(query.lastProgress)
+
+/* Will print something like the following.
+
+{
+  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
+  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
+  "name" : "MyQuery",
+  "timestamp" : "2016-12-14T18:45:24.873Z",
+  "numInputRows" : 10,
+  "inputRowsPerSecond" : 120.0,
+  "processedRowsPerSecond" : 200.0,
+  "durationMs" : {
+    "triggerExecution" : 3,
+    "getOffset" : 2
+  },
+  "eventTime" : {
+    "watermark" : "2016-12-14T18:45:24.873Z"
+  },
+  "stateOperators" : [ ],
+  "sources" : [ {
+    "description" : "KafkaSource[Subscribe[topic-0]]",
+    "startOffset" : {
+      "topic-0" : {
+        "2" : 0,
+        "4" : 1,
+        "1" : 1,
+        "3" : 1,
+        "0" : 1
+      }
+    },
+    "endOffset" : {
+      "topic-0" : {
+        "2" : 0,
+        "4" : 115,
+        "1" : 134,
+        "3" : 21,
+        "0" : 534
+      }
+    },
+    "numInputRows" : 10,
+    "inputRowsPerSecond" : 120.0,
+    "processedRowsPerSecond" : 200.0
+  } ],
+  "sink" : {
+    "description" : "MemorySink"
+  }
+}
+*/
+
+
 println(query.status)
 
-/* Will print the current status of the query
-
-Status of query 'queryName'
-    Query id: 1
-    Status timestamp: 123
-    Input rate: 15.5 rows/sec
-    Processing rate 23.5 rows/sec
-    Latency: 345.0 ms
-    Trigger details:
-        batchId: 5
-        isDataPresentInTrigger: true
-        isTriggerActive: true
-        latency.getBatch.total: 20
-        latency.getOffset.total: 10
-        numRows.input.total: 100
-    Source statuses [1 source]:
-        Source 1 - MySource1
-            Available offset: 0
-            Input rate: 15.5 rows/sec
-            Processing rate: 23.5 rows/sec
-            Trigger details:
-                numRows.input.source: 100
-                latency.getOffset.source: 10
-                latency.getBatch.source: 20
-    Sink status - MySink
-        Committed offsets: [1, -]
+/*  Will print something like the following.
+{
+  "message" : "Waiting for data to arrive",
+  "isDataAvailable" : false,
+  "isTriggerActive" : false
+}
 */
 {% endhighlight %}
 
@@ -1138,34 +1373,63 @@ Status of query 'queryName'
 {% highlight java %}
 StreamingQuery query = ...
 
-System.out.println(query.status);
-
-/* Will print the current status of the query
-
-Status of query 'queryName'
-    Query id: 1
-    Status timestamp: 123
-    Input rate: 15.5 rows/sec
-    Processing rate 23.5 rows/sec
-    Latency: 345.0 ms
-    Trigger details:
-        batchId: 5
-        isDataPresentInTrigger: true
-        isTriggerActive: true
-        latency.getBatch.total: 20
-        latency.getOffset.total: 10
-        numRows.input.total: 100
-    Source statuses [1 source]:
-        Source 1 - MySource1
-            Available offset: 0
-            Input rate: 15.5 rows/sec
-            Processing rate: 23.5 rows/sec
-            Trigger details:
-                numRows.input.source: 100
-                latency.getOffset.source: 10
-                latency.getBatch.source: 20
-    Sink status - MySink
-        Committed offsets: [1, -]
+System.out.println(query.lastProgress());
+/* Will print something like the following.
+
+{
+  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
+  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
+  "name" : "MyQuery",
+  "timestamp" : "2016-12-14T18:45:24.873Z",
+  "numInputRows" : 10,
+  "inputRowsPerSecond" : 120.0,
+  "processedRowsPerSecond" : 200.0,
+  "durationMs" : {
+    "triggerExecution" : 3,
+    "getOffset" : 2
+  },
+  "eventTime" : {
+    "watermark" : "2016-12-14T18:45:24.873Z"
+  },
+  "stateOperators" : [ ],
+  "sources" : [ {
+    "description" : "KafkaSource[Subscribe[topic-0]]",
+    "startOffset" : {
+      "topic-0" : {
+        "2" : 0,
+        "4" : 1,
+        "1" : 1,
+        "3" : 1,
+        "0" : 1
+      }
+    },
+    "endOffset" : {
+      "topic-0" : {
+        "2" : 0,
+        "4" : 115,
+        "1" : 134,
+        "3" : 21,
+        "0" : 534
+      }
+    },
+    "numInputRows" : 10,
+    "inputRowsPerSecond" : 120.0,
+    "processedRowsPerSecond" : 200.0
+  } ],
+  "sink" : {
+    "description" : "MemorySink"
+  }
+}
+*/
+
+
+System.out.println(query.status());
+/*  Will print something like the following.
+{
+  "message" : "Waiting for data to arrive",
+  "isDataAvailable" : false,
+  "isTriggerActive" : false
+}
 */
 {% endhighlight %}
 
@@ -1173,43 +1437,27 @@ Status of query 'queryName'
 <div data-lang="python"  markdown="1">
 
 {% highlight python %}
-query = ...  // a StreamingQuery
+query = ...  # a StreamingQuery
+print(query.lastProgress)
 
-print(query.status)
+'''
+Will print something like the following.
 
+{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
 '''
-Will print the current status of the query
-
-Status of query 'queryName'
-    Query id: 1
-    Status timestamp: 123
-    Input rate: 15.5 rows/sec
-    Processing rate 23.5 rows/sec
-    Latency: 345.0 ms
-    Trigger details:
-        batchId: 5
-        isDataPresentInTrigger: true
-        isTriggerActive: true
-        latency.getBatch.total: 20
-        latency.getOffset.total: 10
-        numRows.input.total: 100
-    Source statuses [1 source]:
-        Source 1 - MySource1
-            Available offset: 0
-            Input rate: 15.5 rows/sec
-            Processing rate: 23.5 rows/sec
-            Trigger details:
-                numRows.input.source: 100
-                latency.getOffset.source: 10
-                latency.getBatch.source: 20
-    Sink status - MySink
-        Committed offsets: [1, -]
+
+print(query.status)
+''' 
+Will print something like the following.
+
+{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
 '''
 {% endhighlight %}
 
 </div>
 </div>
 
+### Asynchronous API
 
 You can also asynchronously monitor all queries associated with a
 `SparkSession` by attaching a `StreamingQueryListener`
@@ -1225,15 +1473,14 @@ stopped and when there is progress made in an active query. Here is an example,
 val spark: SparkSession = ...
 
 spark.streams.addListener(new StreamingQueryListener() {
-
     override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
-        println("Query started: " + queryTerminated.queryStatus.name)
+        println("Query started: " + queryStarted.id)
     }
     override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
-        println("Query terminated: " + queryTerminated.queryStatus.name)
+        println("Query terminated: " + queryTerminated.id)
     }
     override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
-        println("Query made progress: " + queryProgress.queryStatus)
+        println("Query made progress: " + queryProgress.progress)
     }
 })
 {% endhighlight %}
@@ -1245,15 +1492,14 @@ spark.streams.addListener(new StreamingQueryListener() {
 SparkSession spark = ...
 
 spark.streams.addListener(new StreamingQueryListener() {
-
     @Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
-        System.out.println("Query started: " + queryTerminated.queryStatus.name);
+        System.out.println("Query started: " + queryStarted.id());
     }
     @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
-        System.out.println("Query terminated: " + queryTerminated.queryStatus.name);
+        System.out.println("Query terminated: " + queryTerminated.id());
     }
     @Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
-        System.out.println("Query made progress: " + queryProgress.queryStatus);
+        System.out.println("Query made progress: " + queryProgress.progress());
     }
 });
 {% endhighlight %}
@@ -1268,7 +1514,7 @@ Not available in Python.
 </div>
 
 ## Recovering from Failures with Checkpointing 
-In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. As of Spark 2.0, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). 
+In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries). 
 
 <div class="codetabs">
 <div data-lang="scala"  markdown="1">


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org