You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "zhangxinyu (JIRA)" <ji...@apache.org> on 2016/10/26 03:26:58 UTC

[jira] [Comment Edited] (SPARK-17935) Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module

    [ https://issues.apache.org/jira/browse/SPARK-17935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15601778#comment-15601778 ] 

zhangxinyu edited comment on SPARK-17935 at 10/26/16 3:26 AM:
--------------------------------------------------------------

h2. KafkaSink Design Doc

h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming module.

h4. Implement
Four classes are implemented to output data to kafka cluster in structured streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* and overrides function *shortName* and *createSink*. In function *createSink*, *KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will be created in function *addBatch*.
* *KafkaSinkRDD*
*KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to get or create producer to send data
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these producers can be reused.

h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all starting with "*kafka.*". For example, producer configuration *bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these configurations don't start with "kafka.".

h4. Usage
val query = input.writeStream
                          .format("kafka-sink-10")
                          .outputMode("append")
                          .option("kafka.bootstrap.servers", kafka-servers)
                          .option(“topic”, topic)
                          .start()




was (Author: zhangxinyu):
h2. KafkaSink Design Doc

h4. Goal
Output results to kafka cluster(version 0.10.0.0) in structured streaming module.

h4. Implement
Four classes are implemented to output data to kafka cluster in structured streaming module.
* *KafkaSinkProvider*
This class extends trait *StreamSinkProvider* and trait *DataSourceRegister* and overrides function *shortName* and *createSink*. In function *createSink*, *KafkaSink* is created.
* *KafkaSink*
KafkaSink extends *Sink* and overrides function *addBatch*. *KafkaSinkRDD* will be created in function *addBatch*.
* *KafkaSinkRDD*
*KafkaSinkRDD* is designed to distributedly send results to kafka clusters. It extends *RDD*. In function *compute*, *CachedKafkaProducer* will be called to get or create producer to send data
* *CachedKafkaProducer*
*CachedKafkaProducer* is used to store producers in the executors so that these producers can be reused.

h4. Configuration
* *Kafka Producer Configuration*
"*.option()*" is used to configure kafka producer configurations which are all starting with "*kafka.*". For example, producer configuration *bootstrap.servers* can be configured by *.option("kafka.bootstrap.servers", kafka-servers)*.
* *Other Configuration*
Other configuration is also set by ".option()". The difference is these configurations don't start with "kafka.".

h4. Usage
val query = input.writeStream
                          .format("kafkaSink")
                          .outputMode("append")
                          .option("kafka.bootstrap.servers", kafka-servers)
                          .option(“topic”, topic)
                          .start()



> Add KafkaForeachWriter in external kafka-0.8.0 for structured streaming module
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-17935
>                 URL: https://issues.apache.org/jira/browse/SPARK-17935
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Streaming
>    Affects Versions: 2.0.0
>            Reporter: zhangxinyu
>
> Now spark already supports kafkaInputStream. It would be useful that we add `KafkaForeachWriter` to output results to kafka in structured streaming module.
> `KafkaForeachWriter.scala` is put in external kafka-0.8.0.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org