You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pankaj Wahane <pa...@live.com> on 2019/03/06 16:58:58 UTC
Structured Streaming to Kafka Topic
Hi,
I am using structured streaming for ETL.
val data_stream = spark
.readStream // constantly expanding dataframe
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "sms_history")
.option("startingOffsets", "earliest") // begin from start of topic
.option("failOnDataLoss", "false")
.load()
I transform this into a DataSet with following schema.
root
|-- accountId: long (nullable = true)
|-- countryId: long (nullable = true)
|-- credits: double (nullable = true)
|-- deliveryStatus: string (nullable = true)
|-- senderId: string (nullable = true)
|-- sentStatus: string (nullable = true)
|-- source: integer (nullable = true)
|-- createdOn: timestamp (nullable = true)
|-- send_success_credits: double (nullable = true)
|-- send_error_credits: double (nullable = true)
|-- delivered_credits: double (nullable = true)
|-- invalid_sd_credits: double (nullable = true)
|-- undelivered_credits: double (nullable = true)
|-- unknown_credits: double (nullable = true)
Now I want to write this transformed stream to another Kafka topic. I have temporarily used a UDF that accepts all these columns as parameters and create a json string for adding a column "value" for writing to Kafka.
Is there easier and cleaner way to do the same?
Thanks,
Pankaj
Re: Structured Streaming to Kafka Topic
Posted by Akshay Bhardwaj <ak...@gmail.com>.
Hi Pankaj,
What version of Spark are you using?
If you are using 2.4+ then there is an inbuilt function "to_json" which
converts the columns of your dataset to JSON format.
https://spark.apache.org/docs/2.4.0/api/sql/#to_json
Akshay Bhardwaj
+91-97111-33849
On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane <pa...@live.com> wrote:
> Hi,
>
> I am using structured streaming for ETL.
>
> val data_stream = spark
> .readStream // constantly expanding dataframe
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "sms_history")
> .option("startingOffsets", "earliest") // begin from start of topic
> .option("failOnDataLoss", "false")
> .load()
>
> I transform this into a DataSet with following schema.
>
> root
> |-- accountId: long (nullable = true)
> |-- countryId: long (nullable = true)
> |-- credits: double (nullable = true)
> |-- deliveryStatus: string (nullable = true)
> |-- senderId: string (nullable = true)
> |-- sentStatus: string (nullable = true)
> |-- source: integer (nullable = true)
> |-- createdOn: timestamp (nullable = true)
> |-- send_success_credits: double (nullable = true)
> |-- send_error_credits: double (nullable = true)
> |-- delivered_credits: double (nullable = true)
> |-- invalid_sd_credits: double (nullable = true)
> |-- undelivered_credits: double (nullable = true)
> |-- unknown_credits: double (nullable = true)
>
>
> Now I want to write this transformed stream to another Kafka topic. I have
> temporarily used a UDF that accepts all these columns as parameters and
> create a json string for adding a column "value" for writing to Kafka.
>
> Is there easier and cleaner way to do the same?
>
>
> Thanks,
> Pankaj
>
>