You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Armaan Esfahani <ar...@advancedopen.com> on 2017/03/10 18:34:59 UTC
Not Serializable Result Error
Hello, I have been trying to setup a SMACK stack to learn the basics of Kafka Streams and Spark, yet I keep coming across the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
I have a “Tweet” object which is a simple POJO with a Date and String that then has a Serializer and Deserializer class.
I have tested creating an object, serializing it to a local file, then reading it with the deserializer and it works fine—however over the stream it fails.
To read the data from the kafka stream , I have setup a an input stream using the following code:
Map <String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", TweetDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
JavaInputDStream<ConsumerRecord<String, Tweet>> tweets = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Tweet>Subscribe(topicsSet, kafkaParams)
);
To send a sample object to Kafka, I have the following for testing:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.194.194:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.armaanaki.smack.tweet.TweetSerializer");
final KafkaProducer<String, Tweet> kafkaProducer = new KafkaProducer<String, Tweet>(props);
ProducerRecord<String, Tweet> record = new ProducerRecord<String, Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));
kafkaProducer.send(record);
Can anyone explain my error? Thanks!
Re: Not Serializable Result Error
Posted by Michael Noll <mi...@confluent.io>.
Hi Armaan,
> org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0.0 in stage 0.0 (TID 0) had a not serializable result:
org.apache.kafka.clients.consumer.ConsumerRecord
perhaps you should ask that question in the Spark mailing list, which
should increase your chances of getting a good response for this Spark
error. You should also share the Spark and Kafka versions you use.
-Michael
On Fri, Mar 10, 2017 at 7:34 PM, Armaan Esfahani <
armaan.esfahani@advancedopen.com> wrote:
> Hello, I have been trying to setup a SMACK stack to learn the basics of
> Kafka Streams and Spark, yet I keep coming across the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0.0 in stage 0.0 (TID 0) had a not serializable result:
> org.apache.kafka.clients.consumer.ConsumerRecord
>
>
>
> I have a “Tweet” object which is a simple POJO with a Date and String that
> then has a Serializer and Deserializer class.
>
>
>
> I have tested creating an object, serializing it to a local file, then
> reading it with the deserializer and it works fine—however over the stream
> it fails.
>
>
>
> To read the data from the kafka stream , I have setup a an input stream
> using the following code:
>
>
>
> Map <String, Object> kafkaParams = new HashMap<>();
>
> kafkaParams.put("bootstrap.servers", brokers);
>
> kafkaParams.put("key.deserializer", StringDeserializer.class);
>
> kafkaParams.put("value.deserializer", TweetDeserializer.class);
>
> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>
>
>
> JavaInputDStream<ConsumerRecord<String, Tweet>> tweets =
> KafkaUtils.createDirectStream(
>
> jssc,
>
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.<String, Tweet>Subscribe(topicsSet,
> kafkaParams)
>
> );
>
>
>
> To send a sample object to Kafka, I have the following for testing:
>
>
>
> Properties props = new Properties();
>
> props.put("bootstrap.servers", "192.168.194.194:9092");
>
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
>
> props.put("value.serializer", "com.armaanaki.smack.tweet.
> TweetSerializer");
>
>
>
> final KafkaProducer<String, Tweet> kafkaProducer = new
> KafkaProducer<String, Tweet>(props);
>
>
>
> ProducerRecord<String, Tweet> record = new ProducerRecord<String,
> Tweet>("tweets1", "1", new Tweet(new Date(), "Test"));
>
> kafkaProducer.send(record);
>
>
>
>
>
> Can anyone explain my error? Thanks!
>
>