You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Phil Kallos <ph...@gmail.com> on 2015/10/15 07:49:54 UTC

Spark 1.5 Streaming and Kinesis

Hi,

We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis checkpointing
improvements in 1.5.

However after upgrading, we are consistently seeing the following error:

java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with

bin/run-example streaming.KinesisWordCountASL

Am I doing something incorrect?

Re: Spark 1.5 Streaming and Kinesis

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
By correct, I mean: the map declaration looks good to me, so the 
ClassCastException is weird ;)

I'm trying to reproduce the issue in order to investigate.

Regards
JB

On 10/15/2015 08:03 AM, Jean-Baptiste Onofré wrote:
> Hi Phil,
>
> KinesisReceiver is part of extra. Just a dumb question: did you update
> all, including the Spark Kinesis extra containing the KinesisReceiver ?
>
> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
>
> blockIdToSeqNumRanges.clear()
>
> which is a:
>
> private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
> SequenceNumberRanges]
>      with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
>
> So, it doesn't look fully correct to me.
>
> Let me investigate a bit this morning.
>
> Regards
> JB
>
> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>> Hi,
>>
>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>> streaming applications, to take advantage of the new Kinesis
>> checkpointing improvements in 1.5.
>>
>> However after upgrading, we are consistently seeing the following error:
>>
>> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
>> cast to scala.collection.mutable.SynchronizedMap
>> at
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>
>> at
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I even get this when running the Kinesis examples :
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>> with
>>
>> bin/run-example streaming.KinesisWordCountASL
>>
>> Am I doing something incorrect?
>>
>>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark 1.5 Streaming and Kinesis

Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Phil,

KinesisReceiver is part of extra. Just a dumb question: did you update 
all, including the Spark Kinesis extra containing the KinesisReceiver ?

I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:

blockIdToSeqNumRanges.clear()

which is a:

private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, 
SequenceNumberRanges]
     with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]

So, it doesn't look fully correct to me.

Let me investigate a bit this morning.

Regards
JB

On 10/15/2015 07:49 AM, Phil Kallos wrote:
> Hi,
>
> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
> streaming applications, to take advantage of the new Kinesis
> checkpointing improvements in 1.5.
>
> However after upgrading, we are consistently seeing the following error:
>
> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
> cast to scala.collection.mutable.SynchronizedMap
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> I even get this when running the Kinesis examples :
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with
>
> bin/run-example streaming.KinesisWordCountASL
>
> Am I doing something incorrect?
>
>

-- 
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org