You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/01/06 19:29:36 UTC

spark git commit: [SPARK-19074][SS][DOCS] Updated Structured Streaming Programming Guide for update mode and source/sink options

Repository: spark
Updated Branches:
  refs/heads/master a9a137377 -> b59cddaba


[SPARK-19074][SS][DOCS] Updated Structured Streaming Programming Guide for update mode and source/sink options

## What changes were proposed in this pull request?

Updates
- Updated Late Data Handling section by adding a figure for Update Mode. Its more intuitive to explain late data handling with Update Mode, so I added the new figure before the Append Mode figure.
- Updated Output Modes section with Update mode
- Added options for all the sources and sinks

---------------------------
---------------------------

![image](https://cloud.githubusercontent.com/assets/663212/21665176/f150b224-d29f-11e6-8372-14d32da21db9.png)

---------------------------
---------------------------
<img width="931" alt="screen shot 2017-01-03 at 6 09 11 pm" src="https://cloud.githubusercontent.com/assets/663212/21629740/d21c9bb8-d1df-11e6-915b-488a59589fa6.png">
<img width="933" alt="screen shot 2017-01-03 at 6 10 00 pm" src="https://cloud.githubusercontent.com/assets/663212/21629749/e22bdabe-d1df-11e6-86d3-7e51d2f28dbc.png">

---------------------------
---------------------------
![image](https://cloud.githubusercontent.com/assets/663212/21665200/108e18fc-d2a0-11e6-8640-af598cab090b.png)
![image](https://cloud.githubusercontent.com/assets/663212/21665148/cfe414fa-d29f-11e6-9baa-4124ccbab093.png)
![image](https://cloud.githubusercontent.com/assets/663212/21665226/2e8f39e4-d2a0-11e6-85b1-7657e2df5491.png)

Author: Tathagata Das <ta...@gmail.com>

Closes #16468 from tdas/SPARK-19074.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b59cddab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b59cddab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b59cddab

Branch: refs/heads/master
Commit: b59cddaba01cbdf50dbe8fe7ef7b9913bad9552d
Parents: a9a1373
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Jan 6 11:29:01 2017 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Jan 6 11:29:01 2017 -0800

----------------------------------------------------------------------
 ...ructured-streaming-watermark-append-mode.png | Bin 0 -> 249196 bytes
 ...ructured-streaming-watermark-update-mode.png | Bin 0 -> 299141 bytes
 docs/img/structured-streaming-watermark.png     | Bin 252000 -> 0 bytes
 docs/img/structured-streaming.pptx              | Bin 1113902 -> 1126657 bytes
 docs/structured-streaming-programming-guide.md  | 214 ++++++++++++++-----
 .../spark/sql/streaming/DataStreamWriter.scala  |   6 +-
 6 files changed, 166 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b59cddab/docs/img/structured-streaming-watermark-append-mode.png
----------------------------------------------------------------------
diff --git a/docs/img/structured-streaming-watermark-append-mode.png b/docs/img/structured-streaming-watermark-append-mode.png
new file mode 100644
index 0000000..541d5bf
Binary files /dev/null and b/docs/img/structured-streaming-watermark-append-mode.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/b59cddab/docs/img/structured-streaming-watermark-update-mode.png
----------------------------------------------------------------------
diff --git a/docs/img/structured-streaming-watermark-update-mode.png b/docs/img/structured-streaming-watermark-update-mode.png
new file mode 100644
index 0000000..6827849
Binary files /dev/null and b/docs/img/structured-streaming-watermark-update-mode.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/b59cddab/docs/img/structured-streaming-watermark.png
----------------------------------------------------------------------
diff --git a/docs/img/structured-streaming-watermark.png b/docs/img/structured-streaming-watermark.png
deleted file mode 100644
index f21fbda..0000000
Binary files a/docs/img/structured-streaming-watermark.png and /dev/null differ

http://git-wip-us.apache.org/repos/asf/spark/blob/b59cddab/docs/img/structured-streaming.pptx
----------------------------------------------------------------------
diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx
index f5bdfc0..2ffd9f2 100644
Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ

http://git-wip-us.apache.org/repos/asf/spark/blob/b59cddab/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 6cd050e..52dbbc8 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou
 
   - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
   
-  - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (not available yet in Spark 2.0). Note that this is different from the Complete Mode in that this mode does not output the rows that are not changed.
+  - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
 
 Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
 
@@ -424,7 +424,7 @@ Streaming DataFrames can be created through the `DataStreamReader` interface
 ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs)
 returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source \u2013 data format, schema, options, etc.
 
-#### Data Sources
+#### Input Sources
 In Spark 2.0, there are a few built-in sources.
 
   - **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
@@ -433,6 +433,54 @@ In Spark 2.0, there are a few built-in sources.
 
   - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. 
 
+Some sources are not fault-tolerant because they do not guarantee that data can be replayed using 
+checkpointed offsets after a failure. See the earlier section on 
+[fault-tolerance semantics](#fault-tolerance-semantics).
+Here are the details of all the sources in Spark.
+
+<table class="table">
+  <tr>
+    <th>Source</th>
+    <th>Options</th>
+    <th>Fault-tolerant</th>
+    <th>Notes</th>
+  </tr>
+  <tr>
+    <td><b>File source</b></td>
+    <td>
+        <code>path</code>: path to the input directory, and common to all file formats.
+        <br/><br/>
+        For file-format-specific options, see the related methods in <code>DataStreamReader</code>
+        (<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>).
+        E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code></td>
+    <td>Yes</td>
+    <td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td>
+  </tr>
+  <tr>
+    <td><b>Socket Source</b></td>
+    <td>
+        <code>host</code>: host to connect to, must be specified<br/>
+        <code>port</code>: port to connect to, must be specified
+    </td>
+    <td>No</td>
+    <td></td>
+  </tr>
+  <tr>
+    <td><b>Kafka Source</b></td>
+    <td>
+        See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a>.
+    </td>
+    <td>Yes</td>
+    <td></td>
+  </tr>
+  <tr>
+    <td></td>
+    <td></td>
+    <td></td>
+    <td></td>
+  </tr>
+</table>
+
 Here are some examples.
 
 <div class="codetabs">
@@ -753,34 +801,47 @@ windowedCounts = words
 
 In this example, we are defining the watermark of the query on the value of the column "timestamp", 
 and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query 
-is run in Append output mode (discussed later in [Output Modes](#output-modes) section), 
-the engine will track the current event time from the column "timestamp" and wait for additional
-"10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table.
+is run in Update output mode (discussed later in [Output Modes](#output-modes) section), 
+the engine will keep updating counts of a window in the Resule Table until the window is older 
+than the watermark, which lags behind the current event time in column "timestamp" by 10 minutes.
 Here is an illustration. 
 
-![Watermarking in Append Mode](img/structured-streaming-watermark.png)
+![Watermarking in Update Mode](img/structured-streaming-watermark-update-mode.png)
 
 As shown in the illustration, the maximum event time tracked by the engine is the 
 *blue dashed line*, and the watermark set as `(max event time - '10 mins')`
 at the beginning of every trigger is the red line  For example, when the engine observes the data 
 `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`.
-For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system
-is waiting for late data. After the system finds data (i.e. `(12:21, owl)`) such that the 
-watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will
-not change any further as all "too-late" data older than 12:10 will be ignored.  
-
-Note that in Append output mode, the system has to wait for "late threshold" time 
-before it can output the aggregation of a window. This may not be ideal if data can be very late, 
-(say 1 day) and you like to have partial counts without waiting for a day. In future, we will add
-Update output mode which would allows every update to aggregates to be written to sink every trigger. 
+This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late
+data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in
+windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in 
+the trigger, the engine still maintains the intermediate counts as state and correctly updates the 
+counts of the related windows. However, when the watermark is updated to 12:11, the intermediate 
+state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) 
+is considered "too late" and therefore ignored. Note that after every trigger, 
+the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by 
+the Update mode.
+
+Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work
+with them, we have also support Append Mode, where only the *final counts* are written to sink.
+This is illustrated below.
+
+![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png)
+
+Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. 
+However, the partial counts are not updated to the Result Table and not written to sink. The engine
+waits for "10 mins" for late date to be counted, 
+then drops intermediate state of a window < watermark, and appends the final
+counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is 
+appended to the Result Table only after the watermark is updated to `12:11`. 
 
 **Conditions for watermarking to clean aggregation state**
 It is important to note that the following conditions must be satisfied for the watermarking to 
-clean the state in aggregation queries *(as of Spark 2.1, subject to change in the future)*.
+clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*.
 
-- **Output mode must be Append.** Complete mode requires all aggregate data to be preserved, and hence 
-cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section 
-for detailed explanation of the semantics of each output mode.
+- **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, 
+and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) 
+section for detailed explanation of the semantics of each output mode.
 
 - The aggregation must have either the event-time column, or a `window` on the event-time column. 
 
@@ -835,8 +896,9 @@ streamingDf.join(staticDf, "type", "right_join")  # right outer join with a stat
 </div>
 
 ### Unsupported Operations
-However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting is not supported on the input streaming Dataset, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. As of Spark 2.0, some of the unsupported operations are as follows
-
+There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. 
+Some of them are as follows.
+ 
 - Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
 
 - Limit and take first N rows are not supported on streaming Datasets.
@@ -863,7 +925,12 @@ In addition, there are some Dataset methods that will not work on streaming Data
 
 - `show()` - Instead use the console sink (see next section).
 
-If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
+If you try any of these operations, you will see an `AnalysisException` like "operation XYZ is not supported with streaming DataFrames/Datasets".
+While some of them may be supported in future releases of Spark, 
+there are others which are fundamentally hard to implement on streaming data efficiently. 
+For example, sorting on the input stream is not supported, as it requires keeping 
+track of all the data received in the stream. This is therefore fundamentally hard to execute 
+efficiently.
 
 ## Starting Streaming Queries
 Once you have defined the final result DataFrame/Dataset, all that is left is for you start the streaming computation. To do that, you have to use the `DataStreamWriter`
@@ -894,11 +961,11 @@ fault-tolerant sink). For example, queries with only `select`,
 - **Complete mode** - The whole Result Table will be outputted to the sink after every trigger.
  This is supported for aggregation queries.
 
-- **Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table that were 
+- **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were 
 updated since the last trigger will be outputted to the sink. 
 More information to be added in future releases.
 
-Different types of streaming queries support different output modes. 
+Different types of streaming queries support different output modes.
 Here is the compatibility matrix.
 
 <table class="table">
@@ -909,36 +976,38 @@ Here is the compatibility matrix.
     <th>Notes</th>        
   </tr>
   <tr>
-    <td colspan="2" valign="middle"><br/>Queries without aggregation</td>
-    <td>Append</td>
-    <td>
-        Complete mode note supported as it is infeasible to keep all data in the Result Table.
+    <td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
+    <td style="vertical-align: middle;">Append</td>
+    <td style="vertical-align: middle;">
+        Complete mode not supported as it is infeasible to keep all data in the Result Table.
     </td>
   </tr>
   <tr>
-    <td rowspan="2">Queries with aggregation</td>
-    <td>Aggregation on event-time with watermark</td>
-    <td>Append, Complete</td>
+    <td rowspan="2" style="vertical-align: middle;">Queries with aggregation</td>
+    <td style="vertical-align: middle;">Aggregation on event-time with watermark</td>
+    <td style="vertical-align: middle;">Append, Update, Complete</td>
     <td>
         Append mode uses watermark to drop old aggregation state. But the output of a 
         windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
         the modes semantics, rows can be added to the Result Table only once after they are 
-        finalized (i.e. after watermark is crossed). See 
-        <a href="#handling-late-data">Late Data</a> section for more details.
+        finalized (i.e. after watermark is crossed). See the
+        <a href="#handling-late-data-and-watermarking">Late Data</a> section for more details.
+        <br/><br/>
+        Update mode uses watermark to drop old aggregation state.
         <br/><br/>
         Complete mode does drop not old aggregation state since by definition this mode
         preserves all data in the Result Table.
     </td>    
   </tr>
   <tr>
-    <td>Other aggregations</td>
-    <td>Complete</td>
+    <td style="vertical-align: middle;">Other aggregations</td>
+    <td style="vertical-align: middle;">Complete, Update</td>
     <td>
+        Since no watermark is defined (only defined in other category), 
+        old aggregation state is not dropped.
+        <br/><br/>
         Append mode is not supported as aggregates can update thus violating the semantics of 
         this mode.
-        <br/><br/>
-        Complete mode does drop not old aggregation state since by definition this mode
-        preserves all data in the Result Table.
     </td>  
   </tr>
   <tr>
@@ -954,49 +1023,94 @@ There are a few types of built-in output sinks.
 
 - **File sink** - Stores the output to a directory. 
 
+{% highlight scala %}
+writeStream
+    .format("parquet")        // can be "orc", "json", "csv", etc.
+    .option("path", "path/to/destination/dir")
+    .start()
+{% endhighlight %}
+
 - **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details.
 
+{% highlight scala %}
+writeStream
+    .foreach(...)
+    .start()
+{% endhighlight %}
+
 - **Console sink (for debugging)** - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger.
 
-- **Memory sink (for debugging)** - The output is stored in memory as an in-memory table.  Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger.
+{% highlight scala %}
+writeStream
+    .format("console")
+    .start()
+{% endhighlight %}
+
+- **Memory sink (for debugging)** - The output is stored in memory as an in-memory table.
+Both, Append and Complete output modes, are supported. This should be used for debugging purposes
+on low data volumes as the entire output is collected and stored in the driver's memory.
+Hence, use it with caution.
+
+{% highlight scala %}
+writeStream
+    .format("memory")
+    .queryName("tableName")
+    .start()
+{% endhighlight %}
 
-Here is a table of all the sinks, and the corresponding settings.
+Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are 
+meant for debugging purposes only. See the earlier section on 
+[fault-tolerance semantics](#fault-tolerance-semantics). 
+Here are the details of all the sinks in Spark.
 
 <table class="table">
   <tr>
     <th>Sink</th>
     <th>Supported Output Modes</th>
-    <th style="width:30%">Usage</th>
+    <th>Options</th>
     <th>Fault-tolerant</th>
     <th>Notes</th>
   </tr>
   <tr>
     <td><b>File Sink</b></td>
     <td>Append</td>
-    <td><pre>writeStream<br/>  .format("parquet")<br/>  .option(<br/>    "checkpointLocation",<br/>    "path/to/checkpoint/dir")<br/>  .option(<br/>    "path",<br/>    "path/to/destination/dir")<br/>  .start()</pre></td>
+    <td>
+        <code>path</code>: path to the output directory, must be specified.
+        <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
+        <br/>
+        <code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false)
+        <br/><br/>
+        For file-format-specific options, see the related methods in DataFrameWriter
+        (<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
+        E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
+    </td>
     <td>Yes</td>
     <td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
   </tr>
   <tr>
     <td><b>Foreach Sink</b></td>
-    <td>All modes</td>
-    <td><pre>writeStream<br/>  .foreach(...)<br/>  .start()</pre></td>
+    <td>Append, Update, Compelete</td>
+    <td>None</td>
     <td>Depends on ForeachWriter implementation</td>
     <td>More details in the <a href="#using-foreach">next section</a></td>
   </tr>
   <tr>
     <td><b>Console Sink</b></td>
-    <td>Append, Complete</td>
-    <td><pre>writeStream<br/>  .format("console")<br/>  .start()</pre></td>
+    <td>Append, Update, Complete</td>
+    <td>
+        <code>numRows</code>: Number of rows to print every trigger (default: 20)
+        <br/>
+        <code>truncate</code>: Whether to truncate the output if too long (default: true)
+    </td>
     <td>No</td>
     <td></td>
   </tr>
   <tr>
     <td><b>Memory Sink</b></td>
     <td>Append, Complete</td>
-    <td><pre>writeStream<br/>  .format("memory")<br/>  .queryName("table")<br/>  .start()</pre></td>
-    <td>No</td>
-    <td>Saves the output data as a table, for interactive querying. Table name is the query name.</td>
+    <td>None</td>
+    <td>No. But in Complete Mode, restarted query will recreate the full table.</td>
+    <td>Table name is the query name.</td>
   </tr>
   <tr>
     <td></td>
@@ -1007,7 +1121,7 @@ Here is a table of all the sinks, and the corresponding settings.
   </tr>
 </table>
 
-Finally, you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let\u2019s understand all this with a few examples.
+Note that you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let\u2019s understand all this with a few examples.
 
 
 <div class="codetabs">

http://git-wip-us.apache.org/repos/asf/spark/blob/b59cddab/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 0b39965..abb00ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -115,7 +115,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
   }
 
   /**
-   * Specifies the underlying output data source. Built-in options include "parquet" for now.
+   * Specifies the underlying output data source.
    *
    * @since 2.0.0
    */
@@ -137,9 +137,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    * predicates on the partitioned columns. In order for partitioning to work well, the number
    * of distinct values in each column should typically be less than tens of thousands.
    *
-   * This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
-   *
-   * @since 1.4.0
+   * @since 2.0.0
    */
   @scala.annotation.varargs
   def partitionBy(colNames: String*): DataStreamWriter[T] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org