You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Phil Kallos <ph...@gmail.com> on 2015/10/15 07:49:54 UTC
Spark 1.5 Streaming and Kinesis
Hi,
We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
streaming applications, to take advantage of the new Kinesis checkpointing
improvements in 1.5.
However after upgrading, we are consistently seeing the following error:
java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
cast to scala.collection.mutable.SynchronizedMap
at
org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I even get this when running the Kinesis examples :
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with
bin/run-example streaming.KinesisWordCountASL
Am I doing something incorrect?
Re: Spark 1.5 Streaming and Kinesis
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
By correct, I mean: the map declaration looks good to me, so the
ClassCastException is weird ;)
I'm trying to reproduce the issue in order to investigate.
Regards
JB
On 10/15/2015 08:03 AM, Jean-Baptiste Onofré wrote:
> Hi Phil,
>
> KinesisReceiver is part of extra. Just a dumb question: did you update
> all, including the Spark Kinesis extra containing the KinesisReceiver ?
>
> I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
>
> blockIdToSeqNumRanges.clear()
>
> which is a:
>
> private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
> SequenceNumberRanges]
> with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
>
> So, it doesn't look fully correct to me.
>
> Let me investigate a bit this morning.
>
> Regards
> JB
>
> On 10/15/2015 07:49 AM, Phil Kallos wrote:
>> Hi,
>>
>> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
>> streaming applications, to take advantage of the new Kinesis
>> checkpointing improvements in 1.5.
>>
>> However after upgrading, we are consistently seeing the following error:
>>
>> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
>> cast to scala.collection.mutable.SynchronizedMap
>> at
>> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
>>
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
>>
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
>>
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
>>
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
>>
>> at
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at
>> org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I even get this when running the Kinesis examples :
>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
>> with
>>
>> bin/run-example streaming.KinesisWordCountASL
>>
>> Am I doing something incorrect?
>>
>>
>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Spark 1.5 Streaming and Kinesis
Posted by Jean-Baptiste Onofré <jb...@nanthrax.net>.
Hi Phil,
KinesisReceiver is part of extra. Just a dumb question: did you update
all, including the Spark Kinesis extra containing the KinesisReceiver ?
I checked on tag v1.5.0, and at line 175 of the KinesisReceiver, we see:
blockIdToSeqNumRanges.clear()
which is a:
private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId,
SequenceNumberRanges]
with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
So, it doesn't look fully correct to me.
Let me investigate a bit this morning.
Regards
JB
On 10/15/2015 07:49 AM, Phil Kallos wrote:
> Hi,
>
> We are trying to migrate from Spark1.4 to Spark1.5 for our Kinesis
> streaming applications, to take advantage of the new Kinesis
> checkpointing improvements in 1.5.
>
> However after upgrading, we are consistently seeing the following error:
>
> java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be
> cast to scala.collection.mutable.SynchronizedMap
> at
> org.apache.spark.streaming.kinesis.KinesisReceiver.onStart(KinesisReceiver.scala:175)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1984)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> I even get this when running the Kinesis examples :
> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html with
>
> bin/run-example streaming.KinesisWordCountASL
>
> Am I doing something incorrect?
>
>
--
Jean-Baptiste Onofré
jbonofre@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org