You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/02/21 02:16:14 UTC

spark git commit: [SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide

Repository: spark
Updated Branches:
  refs/heads/master 6d398c05c -> 601d653bf


[SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide

## What changes were proposed in this pull request?

- Added clear information about triggers
- Made the semantics guarantees of watermarks more clear for streaming aggregations and stream-stream joins.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tathagata Das <ta...@gmail.com>

Closes #20631 from tdas/SPARK-23454.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/601d653b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/601d653b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/601d653b

Branch: refs/heads/master
Commit: 601d653bff9160db8477f86d961e609fc2190237
Parents: 6d398c0
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Feb 20 18:16:10 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Feb 20 18:16:10 2018 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 214 +++++++++++++++++++-
 1 file changed, 207 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/601d653b/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 48d6d0b..9a83f15 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -904,7 +904,7 @@ windowedCounts <- count(
 </div>
 
 
-### Handling Late Data and Watermarking
+#### Handling Late Data and Watermarking
 Now consider what happens if one of the events arrives late to the application.
 For example, say, a word generated at 12:04 (i.e. event time) could be received by 
 the application at 12:11. The application should use the time 12:04 instead of 12:11
@@ -925,7 +925,9 @@ specifying the event time column and the threshold on how late the data is expec
 event time. For a specific window starting at time `T`, the engine will maintain state and allow late
 data to update the state until `(max event time seen by the engine - late threshold > T)`. 
 In other words, late data within the threshold will be aggregated, 
-but data later than the threshold will be dropped. Let's understand this with an example. We can 
+but data later than the threshold will start getting dropped
+(see [later]((#semantic-guarantees-of-aggregation-with-watermarking))
+in the section for the exact guarantees). Let's understand this with an example. We can
 easily define watermarking on the previous example using `withWatermark()` as shown below.
 
 <div class="codetabs">
@@ -1031,7 +1033,9 @@ then drops intermediate state of a window < watermark, and appends the final
 counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is 
 appended to the Result Table only after the watermark is updated to `12:11`. 
 
-**Conditions for watermarking to clean aggregation state**
+##### Conditions for watermarking to clean aggregation state
+{:.no_toc}
+
 It is important to note that the following conditions must be satisfied for the watermarking to 
 clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*.
 
@@ -1051,6 +1055,16 @@ from the aggregation column.
 For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append 
 output mode.
 
+##### Semantic Guarantees of Aggregation with Watermarking
+{:.no_toc}
+
+- A watermark delay (set with `withWatermark`) of "2 hours" guarantees that the engine will never
+drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind
+(in terms of event-time) the latest data processed till then is guaranteed to be aggregated.
+
+- However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is
+not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less
+likely is the engine going to process it.
 
 ### Join Operations
 Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
@@ -1062,7 +1076,7 @@ Dataset/DataFrame will be the exactly the same as if it was with a static Datase
 containing the same data in the stream.
 
 
-#### Stream-static joins
+#### Stream-static Joins
 
 Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some
 type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
@@ -1269,6 +1283,12 @@ joined <- join(
 </div>
 </div>
 
+###### Semantic Guarantees of Stream-stream Inner Joins with Watermarking
+{:.no_toc}
+This is similar to the [guarantees provided by watermarking on aggregations](#semantic-guarantees-of-aggregation-with-watermarking).
+A watermark delay of "2 hours" guarantees that the engine will never drop any data that is less than
+ 2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
+
 ##### Outer Joins with Watermarking
 While the watermark + event-time constraints is optional for inner joins, for left and right outer
 joins they must be specified. This is because for generating the NULL results in outer join, the
@@ -1347,7 +1367,14 @@ joined <- join(
 </div>
 
 
-There are a few points to note regarding outer joins.
+###### Semantic Guarantees of Stream-stream Outer Joins with Watermarking
+{:.no_toc}
+Outer joins have the same guarantees as [inner joins](#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking)
+regarding watermark delays and whether data will be dropped or not.
+
+###### Caveats
+{:.no_toc}
+There are a few important characteristics to note regarding how the outer results are generated.
 
 - *The outer NULL results will be generated with a delay that depends on the specified watermark
 delay and the time range condition.* This is because the engine has to wait for that long to ensure
@@ -1962,7 +1989,7 @@ head(sql("select * from aggregates"))
 </div>
 </div>
 
-#### Using Foreach
+##### Using Foreach
 The `foreach` operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface `ForeachWriter`
 ([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html) docs),
 which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.
@@ -1979,6 +2006,172 @@ which has methods that get called whenever there is a sequence of rows generated
 
 - Whenever `open` is called, `close` will also be called (unless the JVM exits due to some error). This is true even if `open` returns false. If there is any error in processing and writing the data, `close` will be called with the error. It is your responsibility to clean up state (e.g. connections, transactions, etc.) that have been created in `open` such that there are no resource leaks.
 
+#### Triggers
+The trigger settings of a streaming query defines the timing of streaming data processing, whether
+the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.
+Here are the different kinds of triggers that are supported.
+
+<table class="table">
+  <tr>
+    <th>Trigger Type</th>
+    <th>Description</th>
+  </tr>
+  <tr>
+    <td><i>unspecified (default)</i></td>
+    <td>
+        If no trigger setting is explicitly specified, then by default, the query will be
+        executed in micro-batch mode, where micro-batches will be generated as soon as
+        the previous micro-batch has completed processing.
+    </td>
+  </tr>
+  <tr>
+    <td><b>Fixed interval micro-batches</b></td>
+    <td>
+        The query will be executed with micro-batches mode, where micro-batches will be kicked off
+        at the user-specified intervals.
+        <ul>
+          <li>If the previous micro-batch completes within the interval, then the engine will wait until
+          the interval is over before kicking off the next micro-batch.</li>
+
+          <li>If the previous micro-batch takes longer than the interval to complete (i.e. if an
+          interval boundary is missed), then the next micro-batch will start as soon as the
+          previous one completes (i.e., it will not wait for the next interval boundary).</li>
+
+          <li>If no new data is available, then no micro-batch will be kicked off.</li>
+        </ul>
+    </td>
+  </tr>
+  <tr>
+    <td><b>One-time micro-batch</b></td>
+    <td>
+        The query will execute *only one* micro-batch to process all the available data and then
+        stop on its own. This is useful in scenarios you want to periodically spin up a cluster,
+        process everything that is available since the last period, and then shutdown the
+        cluster. In some case, this may lead to significant cost savings.
+    </td>
+  </tr>
+  <tr>
+    <td><b>Continuous with fixed checkpoint interval</b><br/><i>(experimental)</i></td>
+    <td>
+        The query will be executed in the new low-latency, continuous processing mode. Read more
+        about this in the <a href="#continuous-processing-experimental">Continuous Processing section</a> below.
+    </td>
+  </tr>
+</table>
+
+Here are a few code examples.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import org.apache.spark.sql.streaming.Trigger
+
+// Default trigger (runs micro-batch as soon as it can)
+df.writeStream
+  .format("console")
+  .start()
+
+// ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.ProcessingTime("2 seconds"))
+  .start()
+
+// One-time trigger
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Once())
+  .start()
+
+// Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Continuous("1 second"))
+  .start()
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+import org.apache.spark.sql.streaming.Trigger
+
+// Default trigger (runs micro-batch as soon as it can)
+df.writeStream
+  .format("console")
+  .start();
+
+// ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.ProcessingTime("2 seconds"))
+  .start();
+
+// One-time trigger
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Once())
+  .start();
+
+// Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(Trigger.Continuous("1 second"))
+  .start();
+
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+
+{% highlight python %}
+
+# Default trigger (runs micro-batch as soon as it can)
+df.writeStream \
+  .format("console") \
+  .start()
+
+# ProcessingTime trigger with two-seconds micro-batch interval
+df.writeStream \
+  .format("console") \
+  .trigger(processingTime='2 seconds') \
+  .start()
+
+# One-time trigger
+df.writeStream \
+  .format("console") \
+  .trigger(once=True) \
+  .start()
+
+# Continuous trigger with one-second checkpointing interval
+df.writeStream
+  .format("console")
+  .trigger(continuous='1 second')
+  .start()
+
+{% endhighlight %}
+</div>
+<div data-lang="r"  markdown="1">
+
+{% highlight r %}
+# Default trigger (runs micro-batch as soon as it can)
+write.stream(df, "console")
+
+# ProcessingTime trigger with two-seconds micro-batch interval
+write.stream(df, "console", trigger.processingTime = "2 seconds")
+
+# One-time trigger
+write.stream(df, "console", trigger.once = TRUE)
+
+# Continuous trigger is not yet supported
+{% endhighlight %}
+</div>
+</div>
+
+
 ## Managing Streaming Queries
 The `StreamingQuery` object created when a query is started can be used to monitor and manage the query. 
 
@@ -2516,7 +2709,10 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
 </div>
 </div>
 
-# Continuous Processing [Experimental]
+# Continuous Processing
+## [Experimental]
+{:.no_toc}
+
 **Continuous processing** is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default *micro-batch processing* engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations). 
 
 To run a supported query in continuous processing mode, all you need to do is specify a **continuous trigger** with the desired checkpoint interval as a parameter. For example, 
@@ -2589,6 +2785,8 @@ spark \
 A checkpoint interval of 1 second means that the continuous processing engine will records the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
 
 ## Supported Queries
+{:.no_toc}
+
 As of Spark 2.3, only the following type of queries are supported in the continuous processing mode.
 
 - *Operations*: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (`select`, `map`, `flatMap`, `mapPartitions`, etc.) and selections (`where`, `filter`, etc.).
@@ -2606,6 +2804,8 @@ As of Spark 2.3, only the following type of queries are supported in the continu
 See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.
 
 ## Caveats
+{:.no_toc}
+
 - Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel. Therefore, before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel. For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress.
 - Stopping a continuous processing stream may produce spurious task termination warnings. These can be safely ignored.
 - There are currently no automatic retries of failed tasks. Any failure will lead to the query being stopped and it needs to be manually restarted from the checkpoint.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org