You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by tcondie <gi...@git.apache.org> on 2017/03/10 19:19:16 UTC

[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

GitHub user tcondie opened a pull request:

    https://github.com/apache/spark/pull/17246

    [SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka

    ## What changes were proposed in this pull request?
    
    Add documentation that describes how to write streaming and batch queries to Kafka.
    
    @zsxwing @tdas 
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tcondie/spark kafka-write-docs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17246.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17246
    
----
commit 172d4505e5583c541e4644b1eeb12f853bf638cd
Author: Tyson Condie <tc...@gmail.com>
Date:   2017-03-10T19:14:33Z

    update

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106720282
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .save()
    --- End diff --
    
    Its not. Kafka does not support transactions yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74789/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74331 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74331/testReport)** for PR 17246 at commit [`0efbca4`](https://github.com/apache/spark/commit/0efbca47aee2d4ffa33f874e2c513a6f916bb264).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74913/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74756 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74756/testReport)** for PR 17246 at commit [`26e3f8b`](https://github.com/apache/spark/commit/26e3f8b6f5d821af7b5dc399d9aa0387d8131e8c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74913 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74913/testReport)** for PR 17246 at commit [`6b5ca9d`](https://github.com/apache/spark/commit/6b5ca9df36bbd8c287f6b979bfaf5e9dbf568190).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106720832
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    --- End diff --
    
    Writing output of...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Few minor points.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106798211
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli
     For Python applications, you need to add this above library and its dependencies when deploying your
     application. See the [Deploying](#deploying) subsection below.
     
    -### Creating a Kafka Source Stream
    +## Reading Data from Kafka
    +
    +### Creating a Kafka Source for Streaming Queries
     
     <div class="codetabs">
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
     
     // Subscribe to 1 topic
    -val ds1 = spark
    +val ds = spark
    --- End diff --
    
    hey ... `load()` will return a df not ds. may be a little confusing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74913 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74913/testReport)** for PR 17246 at commit [`6b5ca9d`](https://github.com/apache/spark/commit/6b5ca9df36bbd8c287f6b979bfaf5e9dbf568190).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106798265
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +374,213 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Writing Data to Kafka
    +
    +Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that 
    +Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
    +or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
    +to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
    +Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, 
    +if writing the query is successful, then you can assume that the query output was written at least once. A possible
    +solution to remove duplicates when reading the written data could be to introduce a primary (unique) key 
    +that can be used to perform de-duplication when reading.
    +
    +Each row being written to Kafka has the following schema:
    --- End diff --
    
    The Dataframe being written to Kafka should have the following columns in the schema.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17246


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74789 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74789/testReport)** for PR 17246 at commit [`a3f769c`](https://github.com/apache/spark/commit/a3f769c3aef000c1950022ed93c5a610bd0df585).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74331 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74331/testReport)** for PR 17246 at commit [`0efbca4`](https://github.com/apache/spark/commit/0efbca47aee2d4ffa33f874e2c513a6f916bb264).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106722120
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .save()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Each row being written to Kafka has the following schema:
    +<table class="table">
    +<tr><th>Column</th><th>Type</th></tr>
    +<tr>
    +  <td>key (optional)</td>
    +  <td>string or binary</td>
    +</tr>
    +<tr>
    +  <td>value (required)</td>
    +  <td>string or binary</td>
    +</tr>
    +<tr>
    +  <td>topic (*optional)</td>
    +  <td>string</td>
    +</tr>
    +</table>
    +\* The topic column is required if the "topic" configuration option is not specified.<br>
    +
    +The value column is the only required option. If a key column is not specified then 
    +a ```null``` valued key column will be automatically added (see Kafka semantics on 
    +how ```null``` valued key values are handled). If a topic column exists then its value
    +is used as the topic when writing the given row to Kafka, unless the "topic" configuration
    +option is set i.e., the "topic" configuration option overrides the topic column.
    +
    +The following options must be set for the Kafka sink
    +for both batch and streaming queries.
    +
    +<table class="table">
    +<tr><th>Option</th><th>value</th><th>meaning</th></tr>
    +<tr>
    +  <td>kafka.bootstrap.servers</td>
    +  <td>A comma-separated list of host:port</td>
    +  <td>The Kafka "bootstrap.servers" configuration.</td>
    +</tr>
    +</table>
    +
    +The following configurations are optional:
    +
    +<table class="table">
    +<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
    +<tr>
    +  <td>topic</td>
    +  <td>string</td>
    +  <td>none</td>
    +  <td>streaming and batch</td>
    +  <td>Sets the topic that all rows will be written to in Kafka. This option overrides any
    +  topic column that may exist in the data.</td>
    +</tr>
    +</table>
    +
    +
    +## Kafka Specific Configurations
    +
     Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, 
     `stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see 
    --- End diff --
    
    kafkaParams -> kafka parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106722231
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .save()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Each row being written to Kafka has the following schema:
    +<table class="table">
    +<tr><th>Column</th><th>Type</th></tr>
    +<tr>
    +  <td>key (optional)</td>
    +  <td>string or binary</td>
    +</tr>
    +<tr>
    +  <td>value (required)</td>
    +  <td>string or binary</td>
    +</tr>
    +<tr>
    +  <td>topic (*optional)</td>
    +  <td>string</td>
    +</tr>
    +</table>
    +\* The topic column is required if the "topic" configuration option is not specified.<br>
    +
    +The value column is the only required option. If a key column is not specified then 
    +a ```null``` valued key column will be automatically added (see Kafka semantics on 
    +how ```null``` valued key values are handled). If a topic column exists then its value
    +is used as the topic when writing the given row to Kafka, unless the "topic" configuration
    +option is set i.e., the "topic" configuration option overrides the topic column.
    +
    +The following options must be set for the Kafka sink
    +for both batch and streaming queries.
    +
    +<table class="table">
    +<tr><th>Option</th><th>value</th><th>meaning</th></tr>
    +<tr>
    +  <td>kafka.bootstrap.servers</td>
    +  <td>A comma-separated list of host:port</td>
    +  <td>The Kafka "bootstrap.servers" configuration.</td>
    +</tr>
    +</table>
    +
    +The following configurations are optional:
    +
    +<table class="table">
    +<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
    +<tr>
    +  <td>topic</td>
    +  <td>string</td>
    +  <td>none</td>
    +  <td>streaming and batch</td>
    +  <td>Sets the topic that all rows will be written to in Kafka. This option overrides any
    +  topic column that may exist in the data.</td>
    +</tr>
    +</table>
    +
    +
    +## Kafka Specific Configurations
    +
     Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g, 
     `stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see 
    -[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
    +[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
    +read related parameters, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
    --- End diff --
    
    parameters related to reading
    parameters related to writing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106722579
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .save()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Each row being written to Kafka has the following schema:
    --- End diff --
    
    I think its better to put this section of necessary and optional columns before the examples, right after "Producing Data to Kafka". The sink options can stay here after the examples.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106721109
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    --- End diff --
    
    I think section needs a big caveat regarding the lack of guarantees of Kafka writes - since Kafka does not support transactions, data may be duplicated due to failures and reexecutions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74330 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74330/testReport)** for PR 17246 at commit [`172d450`](https://github.com/apache/spark/commit/172d4505e5583c541e4644b1eeb12f853bf638cd).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74756/testReport)** for PR 17246 at commit [`26e3f8b`](https://github.com/apache/spark/commit/26e3f8b6f5d821af7b5dc399d9aa0387d8131e8c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106798256
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +374,213 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Writing Data to Kafka
    +
    +Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that 
    +Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
    +or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
    +to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
    +Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However, 
    +if writing the query is successful, then you can assume that the query output was written at least once. A possible
    +solution to remove duplicates when reading the written data could be to introduce a primary (unique) key 
    +that can be used to perform de-duplication when reading.
    --- End diff --
    
    +1 for this suggestion!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106798236
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli
     For Python applications, you need to add this above library and its dependencies when deploying your
     application. See the [Deploying](#deploying) subsection below.
     
    -### Creating a Kafka Source Stream
    +## Reading Data from Kafka
    +
    +### Creating a Kafka Source for Streaming Queries
     
     <div class="codetabs">
     <div data-lang="scala" markdown="1">
     {% highlight scala %}
     
     // Subscribe to 1 topic
    -val ds1 = spark
    +val ds = spark
    --- End diff --
    
    nvm. minor point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74330/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74331/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74789 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74789/testReport)** for PR 17246 at commit [`a3f769c`](https://github.com/apache/spark/commit/a3f769c3aef000c1950022ed93c5a610bd0df585).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    **[Test build #74330 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74330/testReport)** for PR 17246 at commit [`172d450`](https://github.com/apache/spark/commit/172d4505e5583c541e4644b1eeb12f853bf638cd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106721185
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .save()
    --- End diff --
    
    Thanks for highlighting this though. This should be clearly documented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74756/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106720658
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    --- End diff --
    
    nit: you can make all of the `df1` and `df2` just `df`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106720890
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    --- End diff --
    
    Writing output of ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by CodingCat <gi...@git.apache.org>.
Github user CodingCat commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r105523893
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +s1 = df1 \
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .start()
    +
    +# Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +s2 = df2 \
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .writeStream \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Writing Batch Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df2.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .save()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +df1.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .write()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .save()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +df1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    +  .write \
    +  .format("kafka") \
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    +  .option("topic", "topic1") \
    +  .save()
    --- End diff --
    
    is it save() atomic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106721924
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -3,9 +3,9 @@ layout: global
     title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
     ---
     
    -Structured Streaming integration for Kafka 0.10 to poll data from Kafka.
    +Structured Streaming integration for Kafka 0.10 to consume data from and produce data to Kafka.
    --- End diff --
    
    Lets simply use "read data" and "write data"
    Here as well as in the titles.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17246: [SPARK-19906][SS][DOCS] Documentation describing ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17246#discussion_r106720722
  
    --- Diff: docs/structured-streaming-kafka-integration.md ---
    @@ -373,11 +375,204 @@ The following configurations are optional:
     </tr>
     </table>
     
    +## Producing Data to Kafka
    +
    +### Writing Streaming Queries to Kafka
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +val s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +val s2 = df2
    +  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
    +StreamingQuery s1 = df1
    +  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic1")
    +  .start()
    +
    +// Write key-value data from a DataFrame to Kafka using a topic specified in the data
    +StreamingQuery s2 = df1
    --- End diff --
    
    nit: did you mean df2? or just make all of them df?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17246: [SPARK-19906][SS][DOCS] Documentation describing how to ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17246
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org