You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by kant kodali <ka...@gmail.com> on 2017/05/16 07:36:06 UTC

How to print data to console in structured streaming using Spark 2.1.0?

Hi All,

I have the following code.

 val ds = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers",bootstrapServers))
                .option("subscribe", topicName)
                .option("checkpointLocation", hdfsCheckPointDir)
                .load();

 val ds1 = ds.select($"value")
 val query = ds1.writeStream.outputMode("append").format("console").start()
 query.awaitTermination()

There are no errors when I execute this code however I don't see any data
being printed out to console? When I run my standalone test Kafka consumer
jar I can see that it is receiving messages. so I am not sure what is going
on with above code? any ideas?

Thanks!

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by kant kodali <ka...@gmail.com>.
This isn't structured Streaming right

On Tue, May 16, 2017 at 4:15 AM, Didac Gil <di...@gmail.com> wrote:

> From what I know, you would have to iterate on each RDD. When you are
> reading from the Stream, Spark actually collects the data as a miniRDD for
> each period of time.
>
> I hope this helps.
>
> ds.foreachRDD{ rdd =>
>
>   val newNames = Seq(“Field1”,"Field2”,"Field3")
>   val mydataDF = rdd.toDF(newNames: _*)
>
>   mydataDF.createOrReplaceTempView(“myTempTable")
>   // Do word count on DataFrame using SQL and print it
>   val wordCountsDataFrame = spark.sql("select *, now() as TStamp from myTempTable")
>   wordCountsDataFrame.write.mode(mode).save(output)
>   val lines = wordCountsDataFrame.count().toInt
> //      wordCountsDataFrame.show(20, false)
>   println("Total entries in this batch: "+lines)
>
> }
>
> On 16 May 2017, at 09:36, kant kodali <ka...@gmail.com> wrote:
>
> Hi All,
>
> I have the following code.
>
>  val ds = sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers",bootstrapServers))
>                 .option("subscribe", topicName)
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
>
> There are no errors when I execute this code however I don't see any data
> being printed out to console? When I run my standalone test Kafka consumer
> jar I can see that it is receiving messages. so I am not sure what is going
> on with above code? any ideas?
>
> Thanks!
>
>
> Didac Gil de la Iglesia
> PhD in Computer Science
> didacgil9@gmail.com
> Spain:     +34 696 285 544 <+34%20696%2028%2055%2044>
> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
> Skype: didac.gil.de.la.iglesia
>
>

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by Didac Gil <di...@gmail.com>.
From what I know, you would have to iterate on each RDD. When you are reading from the Stream, Spark actually collects the data as a miniRDD for each period of time.

I hope this helps.
ds.foreachRDD{ rdd =>
  val newNames = Seq(“Field1”,"Field2”,"Field3")
  val mydataDF = rdd.toDF(newNames: _*)
  mydataDF.createOrReplaceTempView(“myTempTable")
  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame = spark.sql("select *, now() as TStamp from myTempTable")
  wordCountsDataFrame.write.mode(mode).save(output)
  val lines = wordCountsDataFrame.count().toInt
//      wordCountsDataFrame.show(20, false)
  println("Total entries in this batch: "+lines)
}

> On 16 May 2017, at 09:36, kant kodali <ka...@gmail.com> wrote:
> 
> Hi All,
> 
> I have the following code.
> 
>  val ds = sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers",bootstrapServers))
>                 .option("subscribe", topicName)
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
> There are no errors when I execute this code however I don't see any data being printed out to console? When I run my standalone test Kafka consumer jar I can see that it is receiving messages. so I am not sure what is going on with above code? any ideas?
> 
> Thanks!

Didac Gil de la Iglesia
PhD in Computer Science
didacgil9@gmail.com
Spain:     +34 696 285 544
Sweden: +46 (0)730229737
Skype: didac.gil.de.la.iglesia


Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by kant kodali <ka...@gmail.com>.
Thanks Mike & Ryan. Now I can finally see my 5KB messages. However I am
running into the following error.

OpenJDK 64-Bit Server VM warning: INFO:
os::commit_memory(0x0000000734700000, 530579456, 0) failed; error='Cannot
allocate memory' (errno=12)

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 530579456 bytes for
committing reserved memory.
# An error report file with more information is saved as:


I am running spark driver program in the client mode on a standalone
cluster using spark 2.1.1. When things happen like this I wonder which
memory I need to increase and how? Should I increase the driver JVM memory
or executor JVM memory?

On Tue, May 16, 2017 at 4:34 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> I mean the actual kafka client:
>
> <dependency>
>   <groupId>org.apache.kafka</groupId>
>   <artifactId>kafka-clients</artifactId>
>   <version>0.10.0.1</version>
> </dependency>
>
>
> On Tue, May 16, 2017 at 4:29 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> Thanks for the catch. I assume you meant
>> *spark-streaming-kafka-0-10_2.11-2.1.0.jar*
>>
>> I add this in all spark machines under SPARK_HOME/jars.
>>
>> Still same error seems to persist. Is that the right jar or is there
>> anything else I need to add?
>>
>> Thanks!
>>
>>
>>
>> On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust <michael@databricks.com
>> > wrote:
>>
>>> Looks like you are missing the kafka dependency.
>>>
>>> On Tue, May 16, 2017 at 1:04 PM, kant kodali <ka...@gmail.com> wrote:
>>>
>>>> Looks like I am getting the following runtime exception. I am using
>>>> Spark 2.1.0 and the following jars
>>>>
>>>> *spark-sql_2.11-2.1.0.jar*
>>>>
>>>> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>>>>
>>>> *spark-streaming_2.11-2.1.0.jar*
>>>>
>>>>
>>>> Exception in thread "stream execution thread for [id = fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 7c54940a-e453-41de-b256-049b539b59b1]"
>>>>
>>>> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
>>>>     at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>>>>     at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>>>>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>>>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>>>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>>>
>>>>
>>>> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> The default "startingOffsets" is "latest". If you don't push any data
>>>>> after starting the query, it won't fetch anything. You can set it to
>>>>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>>>>> stream from the beginning.
>>>>>
>>>>> On Tue, May 16, 2017 at 12:36 AM, kant kodali <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have the following code.
>>>>>>
>>>>>>  val ds = sparkSession.readStream()
>>>>>>                 .format("kafka")
>>>>>>                 .option("kafka.bootstrap.servers",bootstrapServers))
>>>>>>                 .option("subscribe", topicName)
>>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>>                 .load();
>>>>>>
>>>>>>  val ds1 = ds.select($"value")
>>>>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>>>>  query.awaitTermination()
>>>>>>
>>>>>> There are no errors when I execute this code however I don't see any
>>>>>> data being printed out to console? When I run my standalone test Kafka
>>>>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>>>>> is going on with above code? any ideas?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by Michael Armbrust <mi...@databricks.com>.
I mean the actual kafka client:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.0.1</version>
</dependency>


On Tue, May 16, 2017 at 4:29 PM, kant kodali <ka...@gmail.com> wrote:

> Hi Michael,
>
> Thanks for the catch. I assume you meant
> *spark-streaming-kafka-0-10_2.11-2.1.0.jar*
>
> I add this in all spark machines under SPARK_HOME/jars.
>
> Still same error seems to persist. Is that the right jar or is there
> anything else I need to add?
>
> Thanks!
>
>
>
> On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust <mi...@databricks.com>
> wrote:
>
>> Looks like you are missing the kafka dependency.
>>
>> On Tue, May 16, 2017 at 1:04 PM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Looks like I am getting the following runtime exception. I am using
>>> Spark 2.1.0 and the following jars
>>>
>>> *spark-sql_2.11-2.1.0.jar*
>>>
>>> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>>>
>>> *spark-streaming_2.11-2.1.0.jar*
>>>
>>>
>>> Exception in thread "stream execution thread for [id = fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 7c54940a-e453-41de-b256-049b539b59b1]"
>>>
>>> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
>>>     at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>>>     at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>>>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>>
>>>
>>> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> The default "startingOffsets" is "latest". If you don't push any data
>>>> after starting the query, it won't fetch anything. You can set it to
>>>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>>>> stream from the beginning.
>>>>
>>>> On Tue, May 16, 2017 at 12:36 AM, kant kodali <ka...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have the following code.
>>>>>
>>>>>  val ds = sparkSession.readStream()
>>>>>                 .format("kafka")
>>>>>                 .option("kafka.bootstrap.servers",bootstrapServers))
>>>>>                 .option("subscribe", topicName)
>>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>>                 .load();
>>>>>
>>>>>  val ds1 = ds.select($"value")
>>>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>>>  query.awaitTermination()
>>>>>
>>>>> There are no errors when I execute this code however I don't see any
>>>>> data being printed out to console? When I run my standalone test Kafka
>>>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>>>> is going on with above code? any ideas?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>>
>>>
>>
>

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by kant kodali <ka...@gmail.com>.
Hi Michael,

Thanks for the catch. I assume you meant
*spark-streaming-kafka-0-10_2.11-2.1.0.jar*

I add this in all spark machines under SPARK_HOME/jars.

Still same error seems to persist. Is that the right jar or is there
anything else I need to add?

Thanks!



On Tue, May 16, 2017 at 1:40 PM, Michael Armbrust <mi...@databricks.com>
wrote:

> Looks like you are missing the kafka dependency.
>
> On Tue, May 16, 2017 at 1:04 PM, kant kodali <ka...@gmail.com> wrote:
>
>> Looks like I am getting the following runtime exception. I am using Spark
>> 2.1.0 and the following jars
>>
>> *spark-sql_2.11-2.1.0.jar*
>>
>> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>>
>> *spark-streaming_2.11-2.1.0.jar*
>>
>>
>> Exception in thread "stream execution thread for [id = fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 7c54940a-e453-41de-b256-049b539b59b1]"
>>
>> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
>>     at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>>     at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>>
>>
>> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> The default "startingOffsets" is "latest". If you don't push any data
>>> after starting the query, it won't fetch anything. You can set it to
>>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>>> stream from the beginning.
>>>
>>> On Tue, May 16, 2017 at 12:36 AM, kant kodali <ka...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have the following code.
>>>>
>>>>  val ds = sparkSession.readStream()
>>>>                 .format("kafka")
>>>>                 .option("kafka.bootstrap.servers",bootstrapServers))
>>>>                 .option("subscribe", topicName)
>>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>>                 .load();
>>>>
>>>>  val ds1 = ds.select($"value")
>>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>>  query.awaitTermination()
>>>>
>>>> There are no errors when I execute this code however I don't see any
>>>> data being printed out to console? When I run my standalone test Kafka
>>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>>> is going on with above code? any ideas?
>>>>
>>>> Thanks!
>>>>
>>>
>>>
>>
>

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by Michael Armbrust <mi...@databricks.com>.
Looks like you are missing the kafka dependency.

On Tue, May 16, 2017 at 1:04 PM, kant kodali <ka...@gmail.com> wrote:

> Looks like I am getting the following runtime exception. I am using Spark
> 2.1.0 and the following jars
>
> *spark-sql_2.11-2.1.0.jar*
>
> *spark-sql-kafka-0-10_2.11-2.1.0.jar*
>
> *spark-streaming_2.11-2.1.0.jar*
>
>
> Exception in thread "stream execution thread for [id = fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId = 7c54940a-e453-41de-b256-049b539b59b1]"
>
> java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArrayDeserializer
>     at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
>     at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
>     at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>
>
> On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> The default "startingOffsets" is "latest". If you don't push any data
>> after starting the query, it won't fetch anything. You can set it to
>> "earliest" like ".option("startingOffsets", "earliest")" to start the
>> stream from the beginning.
>>
>> On Tue, May 16, 2017 at 12:36 AM, kant kodali <ka...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I have the following code.
>>>
>>>  val ds = sparkSession.readStream()
>>>                 .format("kafka")
>>>                 .option("kafka.bootstrap.servers",bootstrapServers))
>>>                 .option("subscribe", topicName)
>>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>>                 .load();
>>>
>>>  val ds1 = ds.select($"value")
>>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>>  query.awaitTermination()
>>>
>>> There are no errors when I execute this code however I don't see any
>>> data being printed out to console? When I run my standalone test Kafka
>>> consumer jar I can see that it is receiving messages. so I am not sure what
>>> is going on with above code? any ideas?
>>>
>>> Thanks!
>>>
>>
>>
>

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by kant kodali <ka...@gmail.com>.
Looks like I am getting the following runtime exception. I am using Spark
2.1.0 and the following jars

*spark-sql_2.11-2.1.0.jar*

*spark-sql-kafka-0-10_2.11-2.1.0.jar*

*spark-streaming_2.11-2.1.0.jar*


Exception in thread "stream execution thread for [id =
fcfe1fa6-dab3-4769-9e15-e074af622cc1, runId =
7c54940a-e453-41de-b256-049b539b59b1]"

java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:74)
    at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:245)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:127)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$logicalPlan$1.applyOrElse(StreamExecution.scala:123)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)


On Tue, May 16, 2017 at 10:30 AM, Shixiong(Ryan) Zhu <
shixiong@databricks.com> wrote:

> The default "startingOffsets" is "latest". If you don't push any data
> after starting the query, it won't fetch anything. You can set it to
> "earliest" like ".option("startingOffsets", "earliest")" to start the
> stream from the beginning.
>
> On Tue, May 16, 2017 at 12:36 AM, kant kodali <ka...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have the following code.
>>
>>  val ds = sparkSession.readStream()
>>                 .format("kafka")
>>                 .option("kafka.bootstrap.servers",bootstrapServers))
>>                 .option("subscribe", topicName)
>>                 .option("checkpointLocation", hdfsCheckPointDir)
>>                 .load();
>>
>>  val ds1 = ds.select($"value")
>>  val query = ds1.writeStream.outputMode("append").format("console").start()
>>  query.awaitTermination()
>>
>> There are no errors when I execute this code however I don't see any data
>> being printed out to console? When I run my standalone test Kafka consumer
>> jar I can see that it is receiving messages. so I am not sure what is going
>> on with above code? any ideas?
>>
>> Thanks!
>>
>
>

Re: How to print data to console in structured streaming using Spark 2.1.0?

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
The default "startingOffsets" is "latest". If you don't push any data after
starting the query, it won't fetch anything. You can set it to "earliest"
like ".option("startingOffsets", "earliest")" to start the stream from the
beginning.

On Tue, May 16, 2017 at 12:36 AM, kant kodali <ka...@gmail.com> wrote:

> Hi All,
>
> I have the following code.
>
>  val ds = sparkSession.readStream()
>                 .format("kafka")
>                 .option("kafka.bootstrap.servers",bootstrapServers))
>                 .option("subscribe", topicName)
>                 .option("checkpointLocation", hdfsCheckPointDir)
>                 .load();
>
>  val ds1 = ds.select($"value")
>  val query = ds1.writeStream.outputMode("append").format("console").start()
>  query.awaitTermination()
>
> There are no errors when I execute this code however I don't see any data
> being printed out to console? When I run my standalone test Kafka consumer
> jar I can see that it is receiving messages. so I am not sure what is going
> on with above code? any ideas?
>
> Thanks!
>