You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel de Oliveira Mantovani <da...@gmail.com> on 2017/11/02 15:36:34 UTC
Getting Message From Structured Streaming Format Kafka
Hello, I'm trying to run the following code,
var newContextCreated = false // Flag to detect whether new context
was created or not
val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
private val batchDuration: Duration = Seconds(3)
private val master: String = "local[2]"
private val appName: String = this.getClass().getSimpleName()
private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
// Create a Spark configuration
val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1)) // To make sure data is not deleted by the
time we query it interactively
val spark = SparkSession
.builder
.config(sparkConf)
.getOrCreate()
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "evil_queue")
.load()
lines.printSchema()
import spark.implicits._
val noAggDF = lines.select("key")
noAggDF
.writeStream
.format("console")
.start()
But I'm having the error:
http://paste.scsys.co.uk/565658
How do I get my messages using kafka as format from Structured Streaming ?
Thank you
--
--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341
Re: Getting Message From Structured Streaming Format Kafka
Posted by Daniel de Oliveira Mantovani <da...@gmail.com>.
Hello Burak,
Sorry to the delayed answer, you were right.
1) - I change the sql-kafka connector version and fixed.
2) - The propose was just test, and I was using normal streaming also for
other thing.
I'm was wondering how did you know was the sql-kafka connector version
reading the logs. I Couldn't find anything useful there.
Thank you very much!
On Thu, Nov 2, 2017 at 12:04 PM, Burak Yavuz <br...@gmail.com> wrote:
> Hi Daniel,
>
> Several things:
> 1) Your error seems to suggest you're using a different version of Spark
> and a different version of the sql-kafka connector. Could you make sure
> they are on the same Spark version?
> 2) With Structured Streaming, you may remove everything related to a
> StreamingContext.
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1))
>
> These lines are not doing anything for Structured Streaming.
>
>
> Best,
> Burak
>
> On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
> daniel.oliveira.mantovani@gmail.com> wrote:
>
>> Hello, I'm trying to run the following code,
>>
>> var newContextCreated = false // Flag to detect whether new context was created or not
>> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>>
>> private val batchDuration: Duration = Seconds(3)
>> private val master: String = "local[2]"
>> private val appName: String = this.getClass().getSimpleName()
>> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>>
>> // Create a Spark configuration
>>
>> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>>
>> val ssc = new StreamingContext(sparkConf, batchDuration)
>> ssc.checkpoint(checkpointDir)
>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively
>>
>> val spark = SparkSession
>> .builder
>> .config(sparkConf)
>> .getOrCreate()
>>
>> val lines = spark
>> .readStream
>> .format("kafka")
>> .option("kafka.bootstrap.servers", "localhost:9092")
>> .option("subscribe", "evil_queue")
>> .load()
>>
>> lines.printSchema()
>>
>> import spark.implicits._
>> val noAggDF = lines.select("key")
>>
>> noAggDF
>> .writeStream
>> .format("console")
>> .start()
>>
>>
>> But I'm having the error:
>>
>> http://paste.scsys.co.uk/565658
>>
>>
>> How do I get my messages using kafka as format from Structured Streaming ?
>>
>>
>> Thank you
>>
>>
>> --
>>
>> --
>> Daniel de Oliveira Mantovani
>> Perl Evangelist/Data Hacker
>> +1 786 459 1341 <(786)%20459-1341>
>>
>
>
--
--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341
Re: Getting Message From Structured Streaming Format Kafka
Posted by Burak Yavuz <br...@gmail.com>.
Hi Daniel,
Several things:
1) Your error seems to suggest you're using a different version of Spark
and a different version of the sql-kafka connector. Could you make sure
they are on the same Spark version?
2) With Structured Streaming, you may remove everything related to a
StreamingContext.
val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1))
These lines are not doing anything for Structured Streaming.
Best,
Burak
On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
daniel.oliveira.mantovani@gmail.com> wrote:
> Hello, I'm trying to run the following code,
>
> var newContextCreated = false // Flag to detect whether new context was created or not
> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>
> private val batchDuration: Duration = Seconds(3)
> private val master: String = "local[2]"
> private val appName: String = this.getClass().getSimpleName()
> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>
> // Create a Spark configuration
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively
>
> val spark = SparkSession
> .builder
> .config(sparkConf)
> .getOrCreate()
>
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "evil_queue")
> .load()
>
> lines.printSchema()
>
> import spark.implicits._
> val noAggDF = lines.select("key")
>
> noAggDF
> .writeStream
> .format("console")
> .start()
>
>
> But I'm having the error:
>
> http://paste.scsys.co.uk/565658
>
>
> How do I get my messages using kafka as format from Structured Streaming ?
>
>
> Thank you
>
>
> --
>
> --
> Daniel de Oliveira Mantovani
> Perl Evangelist/Data Hacker
> +1 786 459 1341 <(786)%20459-1341>
>