You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Revin Chalil <rc...@expedia.com> on 2017/05/14 16:31:40 UTC
RE: Spark SQL DataFrame to Kafka Topic
Hi TD / Michael,
I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF
Type mismatch, expected: foreachWriter[Row], actual: KafkaSink
Cannot resolve reference foreach with such signature
Below is the snippet
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
var payload = EventPayloadParser.read(event.getPayload)
new KafkaMessage(payload)
})
})
case class KafkaMessage(
payload: String)
This is where I use the foreach
val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()
In this case, it shows –
Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink
Cannot resolve reference foreach with such signature
Any help is much appreciated. Thank you.
From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers <ko...@tresata.com>
Cc: Peyman Mohajerian <mo...@gmail.com>; Senthil Kumar <se...@gmail.com>; User <us...@spark.apache.org>; senthilec566@apache.org
Subject: Re: Spark SQL DataFrame to Kafka Topic
Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers <ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to kafka
On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian <mo...@gmail.com>> wrote:
Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar <se...@gmail.com>> wrote:
Hi Team ,
Sorry if this question already asked in this forum..
Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
Here is my Code which Reads Parquet File :
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.parquet("..../temp/*.parquet")
df.registerTempTable("beacons")
I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ??
Cheers,
Senthil
RE: Spark SQL DataFrame to Kafka Topic
Posted by Revin Chalil <rc...@expedia.com>.
I couldn’t get this working yet.. If anyone has successfully used forEach Sink for kafka with structured streaming, plz share... Thanks.
From: Revin Chalil [mailto:rchalil@expedia.com]
Sent: Sunday, May 14, 2017 9:32 AM
To: Tathagata Das <ta...@gmail.com>; michael@databricks.com
Cc: Peyman Mohajerian <mo...@gmail.com>; Senthil Kumar <se...@gmail.com>; User <us...@spark.apache.org>; senthilec566@apache.org; Ofir Manor <of...@equalum.io>; Hemanth Gudela <he...@qvantel.com>; lucas.gary@gmail.com; Koert Kuipers <ko...@tresata.com>; silvio.fiorito@granturing.com
Subject: RE: Spark SQL DataFrame to Kafka Topic
Hi TD / Michael,
I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF
Type mismatch, expected: foreachWriter[Row], actual: KafkaSink
Cannot resolve reference foreach with such signature
Below is the snippet
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
var payload = EventPayloadParser.read(event.getPayload)
new KafkaMessage(payload)
})
})
case class KafkaMessage(
payload: String)
This is where I use the foreach
val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()
In this case, it shows –
Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink
Cannot resolve reference foreach with such signature
Any help is much appreciated. Thank you.
From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers <ko...@tresata.com>>
Cc: Peyman Mohajerian <mo...@gmail.com>>; Senthil Kumar <se...@gmail.com>>; User <us...@spark.apache.org>>; senthilec566@apache.org<ma...@apache.org>
Subject: Re: Spark SQL DataFrame to Kafka Topic
Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers <ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to kafka
On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian <mo...@gmail.com>> wrote:
Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar <se...@gmail.com>> wrote:
Hi Team ,
Sorry if this question already asked in this forum..
Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
Here is my Code which Reads Parquet File :
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.parquet("..../temp/*.parquet")
df.registerTempTable("beacons")
I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ??
Cheers,
Senthil
RE: Spark SQL DataFrame to Kafka Topic
Posted by Revin Chalil <rc...@expedia.com>.
Thanks Michael, that worked, appreciate your help.
From: Michael Armbrust [mailto:michael@databricks.com]
Sent: Monday, May 15, 2017 11:45 AM
To: Revin Chalil <rc...@expedia.com>
Cc: User <us...@spark.apache.org>
Subject: Re: Spark SQL DataFrame to Kafka Topic
The foreach sink from that blog post requires that you have a DataFrame with two columns in the form of a Tuple2, (String, String), where as your dataframe has only a single column `payload`. You could change the KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work.
I'd also suggest you just try the native KafkaSink<https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html> that is part of Spark 2.2<http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html>.
On Sun, May 14, 2017 at 9:31 AM, Revin Chalil <rc...@expedia.com>> wrote:
Hi TD / Michael,
I am trying to use the foreach sink to write to Kafka and followed this<https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula<https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF
Type mismatch, expected: foreachWriter[Row], actual: KafkaSink
Cannot resolve reference foreach with such signature
Below is the snippet
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.flatMap(d => {
var events = AvroHelper.readEvents(d)
events.map((event: HdfsEvent) => {
var payload = EventPayloadParser.read(event.getPayload)
new KafkaMessage(payload)
})
})
case class KafkaMessage(
payload: String)
This is where I use the foreach
val writer = new KafkaSink("kafka-topic", KafkaBroker)
val query = data.writeStream.foreach(writer).outputMode("update").start()
In this case, it shows –
Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink
Cannot resolve reference foreach with such signature
Any help is much appreciated. Thank you.
From: Tathagata Das [mailto:tathagata.das1565@gmail.com<ma...@gmail.com>]
Sent: Friday, January 13, 2017 3:31 PM
To: Koert Kuipers <ko...@tresata.com>>
Cc: Peyman Mohajerian <mo...@gmail.com>>; Senthil Kumar <se...@gmail.com>>; User <us...@spark.apache.org>>; senthilec566@apache.org<ma...@apache.org>
Subject: Re: Spark SQL DataFrame to Kafka Topic
Structured Streaming has a foreach sink, where you can essentially do what you want with your data. Its easy to create a Kafka producer, and write the data out to kafka.
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers <ko...@tresata.com>> wrote:
how do you do this with structured streaming? i see no mention of writing to kafka
On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian <mo...@gmail.com>> wrote:
Yes, it is called Structured Streaming: https://docs.databricks.com/_static/notebooks/structured-streaming-kafka.html
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar <se...@gmail.com>> wrote:
Hi Team ,
Sorry if this question already asked in this forum..
Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
Here is my Code which Reads Parquet File :
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val df = sqlContext.read.parquet("..../temp/*.parquet")
df.registerTempTable("beacons")
I want to directly ingest df DataFrame to Kafka ! Is there any way to achieve this ??
Cheers,
Senthil
Re: Spark SQL DataFrame to Kafka Topic
Posted by Michael Armbrust <mi...@databricks.com>.
The foreach sink from that blog post requires that you have a DataFrame
with two columns in the form of a Tuple2, (String, String), where as your
dataframe has only a single column `payload`. You could change the
KafkaSink to extend ForeachWriter[KafkaMessage] and then it would work.
I'd also suggest you just try the native KafkaSink
<https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html>
that is part of Spark 2.2
<http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC2-td21497.html>
.
On Sun, May 14, 2017 at 9:31 AM, Revin Chalil <rc...@expedia.com> wrote:
> Hi TD / Michael,
>
>
>
> I am trying to use the foreach sink to write to Kafka and followed this <https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html> from DBricks blog by Sunil Sitaula <https://databricks.com/blog/author/sunil-sitaula> . I get the below with DF.writeStream.foreach(writer).outputMode("update").start() when using a simple DF
>
> Type mismatch, expected: foreachWriter[Row], actual: KafkaSink
>
> Cannot resolve reference foreach with such signature
>
>
>
> Below is the snippet
>
> *val *data = session
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", KafkaBroker)
> .option("subscribe", InTopic)
> .load()
> .select($"value".as[Array[Byte]])
> .flatMap(d => {
> *var *events = AvroHelper.*readEvents*(d)
> events.map((event: HdfsEvent) => {
> *var *payload = EventPayloadParser.*read*(event.getPayload)
> *new *KafkaMessage(payload)
> })
> })
>
>
>
> *case class *KafkaMessage(
> payload: String)
>
>
>
> This is where I use the foreach
>
> *val *writer = *new *KafkaSink("kafka-topic", KafkaBroker)
> *val *query = data.writeStream.foreach(writer).outputMode("update").start()
>
>
>
> In this case, it shows –
>
> Type mismatch, expected: foreachWriter[Main.KafkaMesage], actual: Main.KafkaSink
>
> Cannot resolve reference foreach with such signature
>
>
>
> Any help is much appreciated. Thank you.
>
>
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1565@gmail.com]
> *Sent:* Friday, January 13, 2017 3:31 PM
> *To:* Koert Kuipers <ko...@tresata.com>
> *Cc:* Peyman Mohajerian <mo...@gmail.com>; Senthil Kumar <
> senthilec566@gmail.com>; User <us...@spark.apache.org>;
> senthilec566@apache.org
> *Subject:* Re: Spark SQL DataFrame to Kafka Topic
>
>
>
> Structured Streaming has a foreach sink, where you can essentially do what
> you want with your data. Its easy to create a Kafka producer, and write the
> data out to kafka.
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html#using-foreach
>
>
>
> On Fri, Jan 13, 2017 at 8:28 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
> how do you do this with structured streaming? i see no mention of writing
> to kafka
>
>
>
> On Fri, Jan 13, 2017 at 10:30 AM, Peyman Mohajerian <mo...@gmail.com>
> wrote:
>
> Yes, it is called Structured Streaming: https://docs.
> databricks.com/_static/notebooks/structured-streaming-kafka.html
>
> http://spark.apache.org/docs/latest/structured-streaming-
> programming-guide.html
>
>
>
> On Fri, Jan 13, 2017 at 3:32 AM, Senthil Kumar <se...@gmail.com>
> wrote:
>
> Hi Team ,
>
>
>
> Sorry if this question already asked in this forum..
>
>
>
> Can we ingest data to Apache Kafka Topic from Spark SQL DataFrame ??
>
>
>
> Here is my Code which Reads Parquet File :
>
>
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc);*
>
> *val df = sqlContext.read.parquet("..../temp/*.parquet")*
>
> *df.registerTempTable("beacons")*
>
>
>
> I want to directly ingest df DataFrame to Kafka ! Is there any way to
> achieve this ??
>
>
>
> Cheers,
>
> Senthil
>
>
>
>
>
>
>