You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hemanth Yamijala <yh...@gmail.com> on 2014/09/08 18:48:37 UTC

Setting Kafka parameters in Spark Streaming

Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the
parameter fetch.message.max.bytes when creating the Kafka DStream. The only
API that seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T],
decoderClass: Class[D], kafkaParams: Map[String, String], topics:
Map[String, Integer], storageLevel: StorageLevel)

I tried to call this as so:

context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
StorageLevel.MEMORY_AND_DISK())

However, this is causing an exception like:

java.lang.ClassCastException: java.lang.Object cannot be cast to
kafka.serializer.Decoder

at
org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)

at
org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)

at
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)

at
org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)

Can anyone provide help on how to set these parameters ?

Thanks

Hemanth

RE: Setting Kafka parameters in Spark Streaming

Posted by "Shao, Saisai" <sa...@intel.com>.
As you mentioned you hope to transplant latest version of Spark into Kafka 0.7 in another mail, there are some notes you should take care:


1.      Kafka 0.7+ can only be compiled with Scala 2.8, while now Spark is compiled with Scala 2.10, there is no binary compatible between these two Scala versions. So you have to modify Kafka code as previously Spark did to fix Scala problem.

2.      High Level Consumer API changes between Kafka 0.7 and 0.8, so you have to modify KafkaInputDStream in Spark Streaming.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhemanth@gmail.com]
Sent: Tuesday, September 09, 2014 1:19 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: Re: Setting Kafka parameters in Spark Streaming

Thanks, Shao, for providing the necessary information.

Hemanth

On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai <sa...@intel.com>> wrote:
Hi Hemanth,

I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhemanth@gmail.com<ma...@gmail.com>]
Sent: Tuesday, September 09, 2014 12:49 AM
To: user@spark.apache.org<ma...@spark.apache.org>
Subject: Setting Kafka parameters in Spark Streaming

Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel)
I tried to call this as so:
context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK())
However, this is causing an exception like:
java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder
    at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
    at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
    at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
    at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
Can anyone provide help on how to set these parameters ?
Thanks
Hemanth


Re: Setting Kafka parameters in Spark Streaming

Posted by Hemanth Yamijala <yh...@gmail.com>.
Thanks, Shao, for providing the necessary information.

Hemanth

On Tue, Sep 9, 2014 at 8:21 AM, Shao, Saisai <sa...@intel.com> wrote:

>  Hi Hemanth,
>
>
>
> I think there is a bug in this API in Spark 0.8.1, so you will meet this
> exception when using Java code with this API, this bug is fixed in latest
> version, as you can see the patch (
> https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+,
> as you still use kafka 0.7, you can modify the Spark code according to this
> patch and rebuild. Still highly recommend to use latest version of Spark
> and Kafka, there are lots of improvements in streaming field.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Hemanth Yamijala [mailto:yhemanth@gmail.com]
> *Sent:* Tuesday, September 09, 2014 12:49 AM
> *To:* user@spark.apache.org
> *Subject:* Setting Kafka parameters in Spark Streaming
>
>
>
> Hi,
>
>
>
> I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the
> parameter fetch.message.max.bytes when creating the Kafka DStream. The only
> API that seems to allow this is the following:
>
>
>
> kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T],
> decoderClass: Class[D], kafkaParams: Map[String, String], topics:
> Map[String, Integer], storageLevel: StorageLevel)
>
> I tried to call this as so:
>
> context.kafkaStream(String.class, StringDecoder.class, kafkaParams,
> topics, StorageLevel.MEMORY_AND_DISK())
>
> However, this is causing an exception like:
>
> java.lang.ClassCastException: java.lang.Object cannot be cast to
> kafka.serializer.Decoder
>
>     at
> org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
>
>     at
> org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
>
>     at
> org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
>
>     at
> org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
>
> Can anyone provide help on how to set these parameters ?
>
> Thanks
>
> Hemanth
>

RE: Setting Kafka parameters in Spark Streaming

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi Hemanth,

I think there is a bug in this API in Spark 0.8.1, so you will meet this exception when using Java code with this API, this bug is fixed in latest version, as you can see the patch (https://github.com/apache/spark/pull/1508). But it’s only for Kafka 0.8+, as you still use kafka 0.7, you can modify the Spark code according to this patch and rebuild. Still highly recommend to use latest version of Spark and Kafka, there are lots of improvements in streaming field.

Thanks
Jerry

From: Hemanth Yamijala [mailto:yhemanth@gmail.com]
Sent: Tuesday, September 09, 2014 12:49 AM
To: user@spark.apache.org
Subject: Setting Kafka parameters in Spark Streaming

Hi,

I am using Spark 0.8.1 with Kafka 0.7. I am trying to set the parameter fetch.message.max.bytes when creating the Kafka DStream. The only API that seems to allow this is the following:

kafkaStream[T, D <: kafka.serializer.Decoder[_]](typeClass: Class[T], decoderClass: Class[D], kafkaParams: Map[String, String], topics: Map[String, Integer], storageLevel: StorageLevel)
I tried to call this as so:
context.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK())
However, this is causing an exception like:
java.lang.ClassCastException: java.lang.Object cannot be cast to kafka.serializer.Decoder
    at org.apache.spark.streaming.dstream.KafkaReceiver.onStart(KafkaInputDStream.scala:105)
    at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:125)
    at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:158)
    at org.apache.spark.streaming.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:154)
Can anyone provide help on how to set these parameters ?
Thanks
Hemanth