You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Armaan Esfahani <ar...@advancedopen.com> on 2017/03/10 18:34:59 UTC

Not Serializable Result Error

Hello, I have been trying to setup a SMACK stack to learn the basics of Kafka Streams and Spark, yet I keep coming across the following error: 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord

 

I have a “Tweet” object which is a simple POJO with a Date and String that then has a Serializer and Deserializer class. 

 

I have tested creating an object, serializing it to a local file, then reading it with the deserializer and it works fine—however over the stream it fails.

 

To read the data from the kafka stream , I have setup a an input stream using the following code:

 

Map <String, Object> kafkaParams = new HashMap<>();

kafkaParams.put("bootstrap.servers", brokers);

kafkaParams.put("key.deserializer", StringDeserializer.class);

kafkaParams.put("value.deserializer", TweetDeserializer.class);

kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");

                                 

JavaInputDStream<ConsumerRecord<String, Tweet>> tweets = KafkaUtils.createDirectStream(

                jssc, 

                LocationStrategies.PreferConsistent(),

                ConsumerStrategies.<String, Tweet>Subscribe(topicsSet, kafkaParams)

                );

 

To send a sample object to Kafka, I have the following for testing:

                                

Properties props = new Properties();

props.put("bootstrap.servers", "192.168.194.194:9092");

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer", "com.armaanaki.smack.tweet.TweetSerializer");

                                

final KafkaProducer<String, Tweet> kafkaProducer = new KafkaProducer<String, Tweet>(props);

 

ProducerRecord<String, Tweet> record = new ProducerRecord<String, Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));

kafkaProducer.send(record);

 

 

Can anyone explain my error? Thanks!


Re: Not Serializable Result Error

Posted by Michael Noll <mi...@confluent.io>.
Hi Armaan,

> org.apache.spark.SparkException: Job aborted due to stage failure:
>    Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord

perhaps you should ask that question in the Spark mailing list, which
should increase your chances of getting a good response for this Spark
error.  You should also share the Spark and Kafka versions you use.

-Michael



On Fri, Mar 10, 2017 at 7:34 PM, Armaan Esfahani <
armaan.esfahani@advancedopen.com> wrote:

> Hello, I have been trying to setup a SMACK stack to learn the basics of
> Kafka Streams and Spark, yet I keep coming across the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.kafka.clients.consumer.ConsumerRecord
>
>
>
> I have a “Tweet” object which is a simple POJO with a Date and String that
> then has a Serializer and Deserializer class.
>
>
>
> I have tested creating an object, serializing it to a local file, then
> reading it with the deserializer and it works fine—however over the stream
> it fails.
>
>
>
> To read the data from the kafka stream , I have setup a an input stream
> using the following code:
>
>
>
> Map <String, Object> kafkaParams = new HashMap<>();
>
> kafkaParams.put("bootstrap.servers", brokers);
>
> kafkaParams.put("key.deserializer", StringDeserializer.class);
>
> kafkaParams.put("value.deserializer", TweetDeserializer.class);
>
> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>
>
> JavaInputDStream<ConsumerRecord<String, Tweet>> tweets =
> KafkaUtils.createDirectStream(
>
>                 jssc,
>
>                 LocationStrategies.PreferConsistent(),
>
>                 ConsumerStrategies.<String, Tweet>Subscribe(topicsSet,
> kafkaParams)
>
>                 );
>
>
>
> To send a sample object to Kafka, I have the following for testing:
>
>
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", "192.168.194.194:9092");
>
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
>
> props.put("value.serializer", "com.armaanaki.smack.tweet.
> TweetSerializer");
>
>
>
> final KafkaProducer<String, Tweet> kafkaProducer = new
> KafkaProducer<String, Tweet>(props);
>
>
>
> ProducerRecord<String, Tweet> record = new ProducerRecord<String,
> Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));
>
> kafkaProducer.send(record);
>
>
>
>
>
> Can anyone explain my error? Thanks!
>
>