You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Pankaj Wahane <pa...@live.com> on 2019/03/06 16:58:58 UTC

Structured Streaming to Kafka Topic

Hi,

I am using structured streaming for ETL.


val data_stream = spark
  .readStream // constantly expanding dataframe
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "sms_history")
  .option("startingOffsets", "earliest") // begin from start of topic
  .option("failOnDataLoss", "false")
  .load()

I transform this into a DataSet with following schema.

root
 |-- accountId: long (nullable = true)
 |-- countryId: long (nullable = true)
 |-- credits: double (nullable = true)
 |-- deliveryStatus: string (nullable = true)
 |-- senderId: string (nullable = true)
 |-- sentStatus: string (nullable = true)
 |-- source: integer (nullable = true)
 |-- createdOn: timestamp (nullable = true)
 |-- send_success_credits: double (nullable = true)
 |-- send_error_credits: double (nullable = true)
 |-- delivered_credits: double (nullable = true)
 |-- invalid_sd_credits: double (nullable = true)
 |-- undelivered_credits: double (nullable = true)
 |-- unknown_credits: double (nullable = true)


Now I want to write this transformed stream to another Kafka topic. I have temporarily used a UDF that accepts all these columns as parameters and create a json string for adding a column "value" for writing to Kafka.

Is there easier and cleaner way to do the same?


Thanks,
Pankaj


Re: Structured Streaming to Kafka Topic

Posted by Akshay Bhardwaj <ak...@gmail.com>.
Hi Pankaj,

What version of Spark are you using?

If you are using 2.4+ then there is an inbuilt function "to_json" which
converts the columns of your dataset to JSON format.
https://spark.apache.org/docs/2.4.0/api/sql/#to_json

Akshay Bhardwaj
+91-97111-33849


On Wed, Mar 6, 2019 at 10:29 PM Pankaj Wahane <pa...@live.com> wrote:

> Hi,
>
> I am using structured streaming for ETL.
>
> val data_stream = spark
>   .readStream // constantly expanding dataframe
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "sms_history")
>   .option("startingOffsets", "earliest") // begin from start of topic
>   .option("failOnDataLoss", "false")
>   .load()
>
> I transform this into a DataSet with following schema.
>
> root
>  |-- accountId: long (nullable = true)
>  |-- countryId: long (nullable = true)
>  |-- credits: double (nullable = true)
>  |-- deliveryStatus: string (nullable = true)
>  |-- senderId: string (nullable = true)
>  |-- sentStatus: string (nullable = true)
>  |-- source: integer (nullable = true)
>  |-- createdOn: timestamp (nullable = true)
>  |-- send_success_credits: double (nullable = true)
>  |-- send_error_credits: double (nullable = true)
>  |-- delivered_credits: double (nullable = true)
>  |-- invalid_sd_credits: double (nullable = true)
>  |-- undelivered_credits: double (nullable = true)
>  |-- unknown_credits: double (nullable = true)
>
>
> Now I want to write this transformed stream to another Kafka topic. I have
> temporarily used a UDF that accepts all these columns as parameters and
> create a json string for adding a column "value" for writing to Kafka.
>
> Is there easier and cleaner way to do the same?
>
>
> Thanks,
> Pankaj
>
>