You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/01/19 00:29:50 UTC

spark git commit: [SPARK-23142][SS][DOCS] Added docs for continuous processing

Repository: spark
Updated Branches:
  refs/heads/master 5d7c4ba4d -> 4cd2ecc0c


[SPARK-23142][SS][DOCS] Added docs for continuous processing

## What changes were proposed in this pull request?

Added documentation for continuous processing. Modified two locations.
- Modified the overview to have a mention of Continuous Processing.
- Added a new section on Continuous Processing at the end.

![image](https://user-images.githubusercontent.com/663212/35083551-a3dd23f6-fbd4-11e7-9e7e-90866f131ca9.png)
![image](https://user-images.githubusercontent.com/663212/35083618-d844027c-fbd4-11e7-9fde-75992cc517bd.png)

## How was this patch tested?
N/A

Author: Tathagata Das <ta...@gmail.com>

Closes #20308 from tdas/SPARK-23142.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cd2ecc0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cd2ecc0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cd2ecc0

Branch: refs/heads/master
Commit: 4cd2ecc0c7222fef1337e04f1948333296c3be86
Parents: 5d7c4ba
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Jan 18 16:29:45 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jan 18 16:29:45 2018 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md | 98 ++++++++++++++++++++-
 1 file changed, 97 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4cd2ecc0/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 1779a42..2ddba2f 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -10,7 +10,9 @@ title: Structured Streaming Programming Guide
 # Overview
 Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
 
-In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
+Internally, by default, Structured Streaming queries are processed using a *micro-batch processing* engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called **Continuous Processing**, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able choose the mode based on your application requirements. 
+
+In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then [later](#continuous-processing-experimental) discuss Continuous Processing model. First, let's start with a simple example of a Structured Streaming query - a streaming word count.
 
 # Quick Example
 Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
@@ -2434,6 +2436,100 @@ write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "pat
 </div>
 </div>
 
+# Continuous Processing [Experimental]
+**Continuous processing** is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default *micro-batch processing* engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations). 
+
+To run a supported query in continuous processing mode, all you need to do is specify a **continuous trigger** with the desired checkpoint interval as a parameter. For example, 
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+{% highlight scala %}
+import org.apache.spark.sql.streaming.Trigger
+
+spark
+  .readStream
+  .format("rate")
+  .option("rowsPerSecond", "10")
+  .option("")
+
+spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("subscribe", "topic1")
+  .load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .trigger(Trigger.Continuous("1 second"))  // only change in query
+  .start()
+{% endhighlight %}
+</div>
+<div data-lang="java"  markdown="1">  
+{% highlight java %}
+import org.apache.spark.sql.streaming.Trigger;
+
+spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("subscribe", "topic1")
+  .load()
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .trigger(Trigger.Continuous("1 second"))  // only change in query
+  .start();
+{% endhighlight %}
+</div>
+<div data-lang="python"  markdown="1">  
+{% highlight python %}
+spark \
+  .readStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("subscribe", "topic1") \
+  .load() \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .trigger(continuous="1 second") \     # only change in query
+  .start()
+
+{% endhighlight %}
+</div>
+</div>
+
+A checkpoint interval of 1 second means that the continuous processing engine will records the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
+
+## Supported Queries
+As of Spark 2.3, only the following type of queries are supported in the continuous processing mode.
+
+- *Operations*: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (`select`, `map`, `flatMap`, `mapPartitions`, etc.) and selections (`where`, `filter`, etc.).
+  + All SQL functions are supported except aggregation functions (since aggregations are not yet supported), `current_timestamp()` and `current_date()` (deterministic computations using time is challenging).
+
+- *Sources*:
+  + Kafka source: All options are supported.
+  + Rate source: Good for testing. Only options that are supported in the continuous mode are `numPartitions` and `rowsPerSecond`.
+
+- *Sinks*: 
+  + Kafka sink: All options are supported.
+  + Memory sink: Good for debugging.
+  + Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger. 
+
+See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.
+
+## Caveats
+- Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel. Therefore, before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel. For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress.
+- Stopping a continuous processing stream may produce spurious task termination warnings. These can be safely ignored.
+- There are currently no automatic retries of failed tasks. Any failure will lead to the query being stopped and it needs to be manually restarted from the checkpoint.
+
 # Additional Information
 
 **Further Reading**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org