You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Daniel de Oliveira Mantovani <da...@gmail.com> on 2017/11/02 15:36:34 UTC

Getting Message From Structured Streaming Format Kafka

Hello, I'm trying to run the following code,

var newContextCreated = false // Flag to detect whether new context
was created or not
val kafkaBrokers = "localhost:9092" // comma separated list of broker:host

private val batchDuration: Duration = Seconds(3)
private val master: String = "local[2]"
private val appName: String = this.getClass().getSimpleName()
private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"

// Create a Spark configuration

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1)) // To make sure data is not deleted by the
time we query it interactively

val spark = SparkSession
  .builder
  .config(sparkConf)
  .getOrCreate()

val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "evil_queue")
  .load()

lines.printSchema()

import spark.implicits._
val noAggDF = lines.select("key")

noAggDF
  .writeStream
  .format("console")
  .start()


But I'm having the error:

http://paste.scsys.co.uk/565658


How do I get my messages using kafka as format from Structured Streaming ?


Thank you


-- 

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341

Re: Getting Message From Structured Streaming Format Kafka

Posted by Daniel de Oliveira Mantovani <da...@gmail.com>.
Hello Burak,

Sorry to the delayed answer, you were right.

1) -  I change the sql-kafka connector version and fixed.
2) - The propose was just test, and I was using normal streaming also for
other thing.

I'm was wondering how did you know was the sql-kafka connector version
reading the logs. I Couldn't find anything useful there.

Thank you very much!

On Thu, Nov 2, 2017 at 12:04 PM, Burak Yavuz <br...@gmail.com> wrote:

> Hi Daniel,
>
> Several things:
>  1) Your error seems to suggest you're using a different version of Spark
> and a different version of the sql-kafka connector. Could you make sure
> they are on the same Spark version?
>  2) With Structured Streaming, you may remove everything related to a
> StreamingContext.
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1))
>
> These lines are not doing anything for Structured Streaming.
>
>
> Best,
> Burak
>
> On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
> daniel.oliveira.mantovani@gmail.com> wrote:
>
>> Hello, I'm trying to run the following code,
>>
>> var newContextCreated = false // Flag to detect whether new context was created or not
>> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>>
>> private val batchDuration: Duration = Seconds(3)
>> private val master: String = "local[2]"
>> private val appName: String = this.getClass().getSimpleName()
>> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>>
>> // Create a Spark configuration
>>
>> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>>
>> val ssc = new StreamingContext(sparkConf, batchDuration)
>> ssc.checkpoint(checkpointDir)
>> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively
>>
>> val spark = SparkSession
>>   .builder
>>   .config(sparkConf)
>>   .getOrCreate()
>>
>> val lines = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "localhost:9092")
>>   .option("subscribe", "evil_queue")
>>   .load()
>>
>> lines.printSchema()
>>
>> import spark.implicits._
>> val noAggDF = lines.select("key")
>>
>> noAggDF
>>   .writeStream
>>   .format("console")
>>   .start()
>>
>>
>> But I'm having the error:
>>
>> http://paste.scsys.co.uk/565658
>>
>>
>> How do I get my messages using kafka as format from Structured Streaming ?
>>
>>
>> Thank you
>>
>>
>> --
>>
>> --
>> Daniel de Oliveira Mantovani
>> Perl Evangelist/Data Hacker
>> +1 786 459 1341 <(786)%20459-1341>
>>
>
>


-- 

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341

Re: Getting Message From Structured Streaming Format Kafka

Posted by Burak Yavuz <br...@gmail.com>.
Hi Daniel,

Several things:
 1) Your error seems to suggest you're using a different version of Spark
and a different version of the sql-kafka connector. Could you make sure
they are on the same Spark version?
 2) With Structured Streaming, you may remove everything related to a
StreamingContext.

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1))

These lines are not doing anything for Structured Streaming.


Best,
Burak

On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <
daniel.oliveira.mantovani@gmail.com> wrote:

> Hello, I'm trying to run the following code,
>
> var newContextCreated = false // Flag to detect whether new context was created or not
> val kafkaBrokers = "localhost:9092" // comma separated list of broker:host
>
> private val batchDuration: Duration = Seconds(3)
> private val master: String = "local[2]"
> private val appName: String = this.getClass().getSimpleName()
> private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"
>
> // Create a Spark configuration
>
> val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
>
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDir)
> ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively
>
> val spark = SparkSession
>   .builder
>   .config(sparkConf)
>   .getOrCreate()
>
> val lines = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092")
>   .option("subscribe", "evil_queue")
>   .load()
>
> lines.printSchema()
>
> import spark.implicits._
> val noAggDF = lines.select("key")
>
> noAggDF
>   .writeStream
>   .format("console")
>   .start()
>
>
> But I'm having the error:
>
> http://paste.scsys.co.uk/565658
>
>
> How do I get my messages using kafka as format from Structured Streaming ?
>
>
> Thank you
>
>
> --
>
> --
> Daniel de Oliveira Mantovani
> Perl Evangelist/Data Hacker
> +1 786 459 1341 <(786)%20459-1341>
>