You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/03/21 00:16:54 UTC

spark git commit: [SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka

Repository: spark
Updated Branches:
  refs/heads/master bec6b16c1 -> c2d1761a5


[SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka

## What changes were proposed in this pull request?

Add documentation that describes how to write streaming and batch queries to Kafka.

zsxwing tdas

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tc...@gmail.com>

Closes #17246 from tcondie/kafka-write-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2d1761a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2d1761a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2d1761a

Branch: refs/heads/master
Commit: c2d1761a57f5d175913284533b3d0417e8718688
Parents: bec6b16
Author: Tyson Condie <tc...@gmail.com>
Authored: Mon Mar 20 17:18:59 2017 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Mar 20 17:18:59 2017 -0700

----------------------------------------------------------------------
 docs/structured-streaming-kafka-integration.md | 321 ++++++++++++++++----
 1 file changed, 264 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2d1761a/docs/structured-streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md
index 522e669..217c1a9 100644
--- a/docs/structured-streaming-kafka-integration.md
+++ b/docs/structured-streaming-kafka-integration.md
@@ -3,9 +3,9 @@ layout: global
 title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
 ---
 
-Structured Streaming integration for Kafka 0.10 to poll data from Kafka.
+Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka.
 
-### Linking
+## Linking
 For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
 
     groupId = org.apache.spark
@@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli
 For Python applications, you need to add this above library and its dependencies when deploying your
 application. See the [Deploying](#deploying) subsection below.
 
-### Creating a Kafka Source Stream
+## Reading Data from Kafka
+
+### Creating a Kafka Source for Streaming Queries
 
 <div class="codetabs">
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 
 // Subscribe to 1 topic
-val ds1 = spark
+val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1")
   .load()
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .as[(String, String)]
 
 // Subscribe to multiple topics
-val ds2 = spark
+val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
   .load()
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .as[(String, String)]
 
 // Subscribe to a pattern
-val ds3 = spark
+val df = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribePattern", "topic.*")
   .load()
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .as[(String, String)]
 
 {% endhighlight %}
@@ -57,31 +59,31 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 {% highlight java %}
 
 // Subscribe to 1 topic
-Dataset<Row> ds1 = spark
+DataFrame<Row> df = spark
   .readStream()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1")
   .load()
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 // Subscribe to multiple topics
-Dataset<Row> ds2 = spark
+DataFrame<Row> df = spark
   .readStream()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1,topic2")
   .load()
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 // Subscribe to a pattern
-Dataset<Row> ds3 = spark
+DataFrame<Row> df = spark
   .readStream()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribePattern", "topic.*")
   .load()
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 {% endhighlight %}
 </div>
@@ -89,37 +91,37 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 {% highlight python %}
 
 # Subscribe to 1 topic
-ds1 = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("subscribe", "topic1")
+df = spark \
+  .readStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("subscribe", "topic1") \
   .load()
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 # Subscribe to multiple topics
-ds2 = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("subscribe", "topic1,topic2")
+df = spark \
+  .readStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("subscribe", "topic1,topic2") \
   .load()
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 # Subscribe to a pattern
-ds3 = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
-  .option("subscribePattern", "topic.*")
+df = spark \
+  .readStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("subscribePattern", "topic.*") \
   .load()
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 {% endhighlight %}
 </div>
 </div>
 
-### Creating a Kafka Source Batch
+### Creating a Kafka Source for Batch Queries 
 If you have a use case that is better suited to batch processing,
 you can create an Dataset/DataFrame for a defined range of offsets.
 
@@ -128,17 +130,17 @@ you can create an Dataset/DataFrame for a defined range of offsets.
 {% highlight scala %}
 
 // Subscribe to 1 topic defaults to the earliest and latest offsets
-val ds1 = spark
+val df = spark
   .read
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1")
   .load()
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .as[(String, String)]
 
 // Subscribe to multiple topics, specifying explicit Kafka offsets
-val ds2 = spark
+val df = spark
   .read
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -146,11 +148,11 @@ val ds2 = spark
   .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
   .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
   .load()
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .as[(String, String)]
 
 // Subscribe to a pattern, at the earliest and latest offsets
-val ds3 = spark
+val df = spark
   .read
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -158,7 +160,7 @@ val ds3 = spark
   .option("startingOffsets", "earliest")
   .option("endingOffsets", "latest")
   .load()
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   .as[(String, String)]
 
 {% endhighlight %}
@@ -167,16 +169,16 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 {% highlight java %}
 
 // Subscribe to 1 topic defaults to the earliest and latest offsets
-Dataset<Row> ds1 = spark
+DataFrame<Row> df = spark
   .read()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
   .option("subscribe", "topic1")
   .load();
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 
 // Subscribe to multiple topics, specifying explicit Kafka offsets
-Dataset<Row> ds2 = spark
+DataFrame<Row> df = spark
   .read()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -184,10 +186,10 @@ Dataset<Row> ds2 = spark
   .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
   .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
   .load();
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 
 // Subscribe to a pattern, at the earliest and latest offsets
-Dataset<Row> ds3 = spark
+DataFrame<Row> df = spark
   .read()
   .format("kafka")
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@@ -195,7 +197,7 @@ Dataset<Row> ds3 = spark
   .option("startingOffsets", "earliest")
   .option("endingOffsets", "latest")
   .load();
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 
 {% endhighlight %}
 </div>
@@ -203,16 +205,16 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
 {% highlight python %}
 
 # Subscribe to 1 topic defaults to the earliest and latest offsets
-ds1 = spark \
+df = spark \
   .read \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
   .option("subscribe", "topic1") \
   .load()
-ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 # Subscribe to multiple topics, specifying explicit Kafka offsets
-ds2 = spark \
+df = spark \
   .read \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
@@ -220,10 +222,10 @@ ds2 = spark \
   .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
   .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
   .load()
-ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 # Subscribe to a pattern, at the earliest and latest offsets
-ds3 = spark \
+df = spark \
   .read \
   .format("kafka") \
   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
@@ -231,8 +233,7 @@ ds3 = spark \
   .option("startingOffsets", "earliest") \
   .option("endingOffsets", "latest") \
   .load()
-ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 {% endhighlight %}
 </div>
 </div>
@@ -373,11 +374,213 @@ The following configurations are optional:
 </tr>
 </table>
 
+## Writing Data to Kafka
+
+Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that 
+Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
+or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
+to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
+Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, 
+if writing the query is successful, then you can assume that the query output was written at least once. A possible
+solution to remove duplicates when reading the written data could be to introduce a primary (unique) key 
+that can be used to perform de-duplication when reading.
+
+The Dataframe being written to Kafka should have the following columns in schema:
+<table class="table">
+<tr><th>Column</th><th>Type</th></tr>
+<tr>
+  <td>key (optional)</td>
+  <td>string or binary</td>
+</tr>
+<tr>
+  <td>value (required)</td>
+  <td>string or binary</td>
+</tr>
+<tr>
+  <td>topic (*optional)</td>
+  <td>string</td>
+</tr>
+</table>
+\* The topic column is required if the "topic" configuration option is not specified.<br>
+
+The value column is the only required option. If a key column is not specified then 
+a ```null``` valued key column will be automatically added (see Kafka semantics on 
+how ```null``` valued key values are handled). If a topic column exists then its value
+is used as the topic when writing the given row to Kafka, unless the "topic" configuration
+option is set i.e., the "topic" configuration option overrides the topic column.
+
+The following options must be set for the Kafka sink
+for both batch and streaming queries.
+
+<table class="table">
+<tr><th>Option</th><th>value</th><th>meaning</th></tr>
+<tr>
+  <td>kafka.bootstrap.servers</td>
+  <td>A comma-separated list of host:port</td>
+  <td>The Kafka "bootstrap.servers" configuration.</td>
+</tr>
+</table>
+
+The following configurations are optional:
+
+<table class="table">
+<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
+<tr>
+  <td>topic</td>
+  <td>string</td>
+  <td>none</td>
+  <td>streaming and batch</td>
+  <td>Sets the topic that all rows will be written to in Kafka. This option overrides any
+  topic column that may exist in the data.</td>
+</tr>
+</table>
+
+### Creating a Kafka Sink for Streaming Queries
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+val ds = df
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified in the data
+val ds = df
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+StreamingQuery ds = df
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .start()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified in the data
+StreamingQuery ds = df
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .writeStream()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .start()
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+ds = df \
+  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .start()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified in the data
+ds = df \
+  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .writeStream \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .start()
+
+{% endhighlight %}
+</div>
+</div>
+
+### Writing the output of Batch Queries to Kafka
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified in the data
+df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+</div>
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .option("topic", "topic1")
+  .save()
+
+// Write key-value data from a DataFrame to Kafka using a topic specified in the data
+df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
+  .write()
+  .format("kafka")
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
+  .save()
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+
+# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
+df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .option("topic", "topic1") \
+  .save()
+
+# Write key-value data from a DataFrame to Kafka using a topic specified in the data
+df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
+  .write \
+  .format("kafka") \
+  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
+  .save()
+  
+{% endhighlight %}
+</div>
+</div>
+
+
+## Kafka Specific Configurations
+
 Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, 
-`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see 
-[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
+`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see 
+[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
+parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
+for parameters related to writing data.
 
-Note that the following Kafka params cannot be set and the Kafka source will throw an exception:
+Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
 
 - **group.id**: Kafka source will create a unique group id for each query automatically.
 - **auto.offset.reset**: Set the source option `startingOffsets` to specify
@@ -389,11 +592,15 @@ Note that the following Kafka params cannot be set and the Kafka source will thr
  DataFrame operations to explicitly deserialize the keys.
 - **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer. 
  Use DataFrame operations to explicitly deserialize the values.
+- **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use
+DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
+- **value.serializer**: values are always serialized with ByteArraySerializer or StringSerializer. Use
+DataFrame oeprations to explicitly serialize the values into either strings or byte arrays.
 - **enable.auto.commit**: Kafka source doesn't commit any offset.
 - **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to
  use ConsumerInterceptor as it may break the query.
 
-### Deploying
+## Deploying
 
 As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}`
 and its dependencies can be directly added to `spark-submit` using `--packages`, such as,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org