You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sateesh Karuturi <sa...@gmail.com> on 2016/09/19 02:56:07 UTC

Getting empty values while receiving from kafka Spark streaming

i am very new to *Spark streaming* and i am implementing small exercise
like sending *XML* data from *kafka* and need to receive that *streaming* data
through *spark streaming.* I tried in all possible ways.. but every time i
am getting *empty values.*


*There is no problem in Kafka side, only problem is receiving the Streaming
data from Spark side.Here is the code how i am implementing:package
com.package; import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaSparkContext; import
org.apache.spark.streaming.Duration; import
org.apache.spark.streaming.api.java.JavaStreamingContext; public class
SparkStringConsumer { public static void main(String[] args) { SparkConf
conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
= new JavaStreamingContext(sc, new Duration(2000)); Map<String, String>
kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
"localhost:9092"); Set<String> topics = Collections.singleton("mytopic");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
" + rdd.partitions().size() + " partitions and " + rdd.count() + "
records"); rdd.foreach(record -> System.out.println(record._2)); });
ssc.start(); ssc.awaitTermination(); } } And i am using following
versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***

Re: Getting empty values while receiving from kafka Spark streaming

Posted by "Chawla,Sumit " <su...@gmail.com>.
How are you producing data? I just tested your code and i can receive the
messages from Kafka.



Regards
Sumit Chawla


On Sun, Sep 18, 2016 at 7:56 PM, Sateesh Karuturi <
sateesh.karuturi9@gmail.com> wrote:

> i am very new to *Spark streaming* and i am implementing small exercise
> like sending *XML* data from *kafka* and need to receive that *streaming* data
> through *spark streaming.* I tried in all possible ways.. but every time
> i am getting *empty values.*
>
>
> *There is no problem in Kafka side, only problem is receiving
> the Streaming data from Spark side.Here is the code how i am
> implementing:package com.package; import org.apache.spark.SparkConf; import
> org.apache.spark.api.java.JavaSparkContext; import
> org.apache.spark.streaming.Duration; import
> org.apache.spark.streaming.api.java.JavaStreamingContext; public class
> SparkStringConsumer { public static void main(String[] args) { SparkConf
> conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
> = new JavaStreamingContext(sc, new Duration(2000)); Map<String, String>
> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
> "localhost:9092"); Set<String> topics = Collections.singleton("mytopic");
> JavaPairInputDStream<String, String> directKafkaStream =
> KafkaUtils.createDirectStream(ssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topics);
> directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
> " + rdd.partitions().size() + " partitions and " + rdd.count() + "
> records"); rdd.foreach(record -> System.out.println(record._2)); });
> ssc.start(); ssc.awaitTermination(); } } And i am using following
> versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***
>

Re: Getting empty values while receiving from kafka Spark streaming

Posted by ayan guha <gu...@gmail.com>.
Empty RDD generally means Kafka is not producing msgs in those intervals.
For example, if I have batch duration of 10secs and there is no msgs within
any 10 secs, RDD corresponding to that 10 secs will be empty.

On Mon, Sep 19, 2016 at 12:56 PM, Sateesh Karuturi <
sateesh.karuturi9@gmail.com> wrote:

> i am very new to *Spark streaming* and i am implementing small exercise
> like sending *XML* data from *kafka* and need to receive that *streaming* data
> through *spark streaming.* I tried in all possible ways.. but every time
> i am getting *empty values.*
>
>
> *There is no problem in Kafka side, only problem is receiving
> the Streaming data from Spark side.Here is the code how i am
> implementing:package com.package; import org.apache.spark.SparkConf; import
> org.apache.spark.api.java.JavaSparkContext; import
> org.apache.spark.streaming.Duration; import
> org.apache.spark.streaming.api.java.JavaStreamingContext; public class
> SparkStringConsumer { public static void main(String[] args) { SparkConf
> conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
> = new JavaStreamingContext(sc, new Duration(2000)); Map<String, String>
> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
> "localhost:9092"); Set<String> topics = Collections.singleton("mytopic");
> JavaPairInputDStream<String, String> directKafkaStream =
> KafkaUtils.createDirectStream(ssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topics);
> directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
> " + rdd.partitions().size() + " partitions and " + rdd.count() + "
> records"); rdd.foreach(record -> System.out.println(record._2)); });
> ssc.start(); ssc.awaitTermination(); } } And i am using following
> versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***
>



-- 
Best Regards,
Ayan Guha