You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Shenghua(Daniel) Wan" <wa...@gmail.com> on 2015/08/18 23:55:59 UTC

broadcast variable of Kafka producer throws ConcurrentModificationException

Hi,
Did anyone see java.util.ConcurrentModificationException when using
broadcast variables?
I encountered this exception when wrapping a Kafka producer like this in
the spark streaming driver.

Here is what I did.
KafkaProducer<String, String> producer = new KafkaProducer<String,
String>(properties);
final Broadcast<KafkaDataProducer> bCastProducer
    = streamingContext.sparkContext().broadcast(producer);

Then within an closure called by a foreachRDD, I was trying to get the
wrapped producer, i.e.
 KafkaProducer<String, String> p = bCastProducer.value();

after rebuilding and rerunning, I got the stack trace like this

Exception in thread "main" com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
ioThread (org.apache.kafka.clients.producer.KafkaProducer)
producer ("my driver")
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
at
org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
at
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
at
org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
at "my driver"
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
at java.util.Vector$Itr.next(Vector.java:1133)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
... 41 more

​Thanks.​

-- 

Regards,
Shenghua (Daniel) Wan

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by Marcin Kuthan <ma...@gmail.com>.
I'm glad that I could help :)
19 sie 2015 8:52 AM "Shenghua(Daniel) Wan" <wa...@gmail.com>
napisał(a):

> +1
>
> I wish I have read this blog earlier. I am using Java and have just
> implemented a singleton producer per executor/JVM during the day.
> Yes, I did see that NonSerializableException when I was debugging the code
> ...
>
> Thanks for sharing.
>
> On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Its a cool blog post! Tweeted it!
>> Broadcasting the configuration necessary for lazily instantiating the
>> producer is a good idea.
>>
>> Nitpick: The first code example has an extra `}` ;)
>>
>> On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan <ma...@gmail.com>
>> wrote:
>>
>>> As long as Kafka producent is thread-safe you don't need any pool at
>>> all. Just share single producer on every executor. Please look at my blog
>>> post for more details. http://allegro.tech/spark-kafka-integration.html
>>> 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wa...@gmail.com>
>>> napisał(a):
>>>
>>>> All of you are right.
>>>>
>>>> I was trying to create too many producers. My idea was to create a
>>>> pool(for now the pool contains only one producer) shared by all the
>>>> executors.
>>>> After I realized it was related to the serializable issues (though I
>>>> did not find clear clues in the source code to indicate the broacast
>>>> template type parameter must be implement serializable),  I followed spark
>>>> cassandra connector design and created a singleton of Kafka producer pools.
>>>> There is not exception noticed.
>>>>
>>>> Thanks for all your comments.
>>>>
>>>>
>>>> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <td...@databricks.com>
>>>> wrote:
>>>>
>>>>> Why are you even trying to broadcast a producer? A broadcast variable
>>>>> is some immutable piece of serializable DATA that can be used for
>>>>> processing on the executors. A Kafka producer is neither DATA nor
>>>>> immutable, and definitely not serializable.
>>>>> The right way to do this is to create the producer in the executors.
>>>>> Please see the discussion in the programming guide
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>>>
>>>>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <co...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> I wouldn't expect a kafka producer to be serializable at all... among
>>>>>> other things, it has a background thread
>>>>>>
>>>>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>>>>>> wanshenghua@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Did anyone see java.util.ConcurrentModificationException when using
>>>>>>> broadcast variables?
>>>>>>> I encountered this exception when wrapping a Kafka producer like
>>>>>>> this in the spark streaming driver.
>>>>>>>
>>>>>>> Here is what I did.
>>>>>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>>>>>> String>(properties);
>>>>>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>>>>>     = streamingContext.sparkContext().broadcast(producer);
>>>>>>>
>>>>>>> Then within an closure called by a foreachRDD, I was trying to get
>>>>>>> the wrapped producer, i.e.
>>>>>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>>>>>
>>>>>>> after rebuilding and rerunning, I got the stack trace like this
>>>>>>>
>>>>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>>>>>> java.util.ConcurrentModificationException
>>>>>>> Serialization trace:
>>>>>>> classes (sun.misc.Launcher$AppClassLoader)
>>>>>>> classloader (java.security.ProtectionDomain)
>>>>>>> context (java.security.AccessControlContext)
>>>>>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>>>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>>>>>> producer ("my driver")
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>>>> at
>>>>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>>>>>> at
>>>>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>>>>>> at "my driver"
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>>>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>>>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>>>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>>> Caused by: java.util.ConcurrentModificationException
>>>>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>>>>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> ... 41 more
>>>>>>>
>>>>>>> ​Thanks.​
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Regards,
>>>>>>> Shenghua (Daniel) Wan
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Regards,
>>>> Shenghua (Daniel) Wan
>>>>
>>>
>>
>
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by "Shenghua(Daniel) Wan" <wa...@gmail.com>.
+1

I wish I have read this blog earlier. I am using Java and have just
implemented a singleton producer per executor/JVM during the day.
Yes, I did see that NonSerializableException when I was debugging the code
...

Thanks for sharing.

On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das <td...@databricks.com> wrote:

> Its a cool blog post! Tweeted it!
> Broadcasting the configuration necessary for lazily instantiating the
> producer is a good idea.
>
> Nitpick: The first code example has an extra `}` ;)
>
> On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan <ma...@gmail.com>
> wrote:
>
>> As long as Kafka producent is thread-safe you don't need any pool at all.
>> Just share single producer on every executor. Please look at my blog post
>> for more details. http://allegro.tech/spark-kafka-integration.html
>> 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wa...@gmail.com>
>> napisał(a):
>>
>>> All of you are right.
>>>
>>> I was trying to create too many producers. My idea was to create a
>>> pool(for now the pool contains only one producer) shared by all the
>>> executors.
>>> After I realized it was related to the serializable issues (though I did
>>> not find clear clues in the source code to indicate the broacast template
>>> type parameter must be implement serializable),  I followed spark cassandra
>>> connector design and created a singleton of Kafka producer pools. There is
>>> not exception noticed.
>>>
>>> Thanks for all your comments.
>>>
>>>
>>> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <td...@databricks.com>
>>> wrote:
>>>
>>>> Why are you even trying to broadcast a producer? A broadcast variable
>>>> is some immutable piece of serializable DATA that can be used for
>>>> processing on the executors. A Kafka producer is neither DATA nor
>>>> immutable, and definitely not serializable.
>>>> The right way to do this is to create the producer in the executors.
>>>> Please see the discussion in the programming guide
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>>
>>>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <co...@koeninger.org>
>>>> wrote:
>>>>
>>>>> I wouldn't expect a kafka producer to be serializable at all... among
>>>>> other things, it has a background thread
>>>>>
>>>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>>>>> wanshenghua@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Did anyone see java.util.ConcurrentModificationException when using
>>>>>> broadcast variables?
>>>>>> I encountered this exception when wrapping a Kafka producer like this
>>>>>> in the spark streaming driver.
>>>>>>
>>>>>> Here is what I did.
>>>>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>>>>> String>(properties);
>>>>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>>>>     = streamingContext.sparkContext().broadcast(producer);
>>>>>>
>>>>>> Then within an closure called by a foreachRDD, I was trying to get
>>>>>> the wrapped producer, i.e.
>>>>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>>>>
>>>>>> after rebuilding and rerunning, I got the stack trace like this
>>>>>>
>>>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>>>>> java.util.ConcurrentModificationException
>>>>>> Serialization trace:
>>>>>> classes (sun.misc.Launcher$AppClassLoader)
>>>>>> classloader (java.security.ProtectionDomain)
>>>>>> context (java.security.AccessControlContext)
>>>>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>>>>> producer ("my driver")
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>>> at
>>>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>>>>> at
>>>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>>> at
>>>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>>>>> at
>>>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>>>>> at "my driver"
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>> at
>>>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>>>>> at
>>>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>> Caused by: java.util.ConcurrentModificationException
>>>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>>>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>> at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>> ... 41 more
>>>>>>
>>>>>> ​Thanks.​
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Regards,
>>>>>> Shenghua (Daniel) Wan
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Regards,
>>> Shenghua (Daniel) Wan
>>>
>>
>


-- 

Regards,
Shenghua (Daniel) Wan

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by Tathagata Das <td...@databricks.com>.
Its a cool blog post! Tweeted it!
Broadcasting the configuration necessary for lazily instantiating the
producer is a good idea.

Nitpick: The first code example has an extra `}` ;)

On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan <ma...@gmail.com>
wrote:

> As long as Kafka producent is thread-safe you don't need any pool at all.
> Just share single producer on every executor. Please look at my blog post
> for more details. http://allegro.tech/spark-kafka-integration.html
> 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wa...@gmail.com>
> napisał(a):
>
>> All of you are right.
>>
>> I was trying to create too many producers. My idea was to create a
>> pool(for now the pool contains only one producer) shared by all the
>> executors.
>> After I realized it was related to the serializable issues (though I did
>> not find clear clues in the source code to indicate the broacast template
>> type parameter must be implement serializable),  I followed spark cassandra
>> connector design and created a singleton of Kafka producer pools. There is
>> not exception noticed.
>>
>> Thanks for all your comments.
>>
>>
>> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <td...@databricks.com>
>> wrote:
>>
>>> Why are you even trying to broadcast a producer? A broadcast variable is
>>> some immutable piece of serializable DATA that can be used for processing
>>> on the executors. A Kafka producer is neither DATA nor immutable, and
>>> definitely not serializable.
>>> The right way to do this is to create the producer in the executors.
>>> Please see the discussion in the programming guide
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>
>>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <co...@koeninger.org>
>>> wrote:
>>>
>>>> I wouldn't expect a kafka producer to be serializable at all... among
>>>> other things, it has a background thread
>>>>
>>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>>>> wanshenghua@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> Did anyone see java.util.ConcurrentModificationException when using
>>>>> broadcast variables?
>>>>> I encountered this exception when wrapping a Kafka producer like this
>>>>> in the spark streaming driver.
>>>>>
>>>>> Here is what I did.
>>>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>>>> String>(properties);
>>>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>>>     = streamingContext.sparkContext().broadcast(producer);
>>>>>
>>>>> Then within an closure called by a foreachRDD, I was trying to get the
>>>>> wrapped producer, i.e.
>>>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>>>
>>>>> after rebuilding and rerunning, I got the stack trace like this
>>>>>
>>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>>>> java.util.ConcurrentModificationException
>>>>> Serialization trace:
>>>>> classes (sun.misc.Launcher$AppClassLoader)
>>>>> classloader (java.security.ProtectionDomain)
>>>>> context (java.security.AccessControlContext)
>>>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>>>> producer ("my driver")
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>> at
>>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>>>> at
>>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>> at
>>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>>>> at
>>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>>>> at "my driver"
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>> at
>>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>>>> at
>>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>> Caused by: java.util.ConcurrentModificationException
>>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>> at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>> ... 41 more
>>>>>
>>>>> ​Thanks.​
>>>>>
>>>>> --
>>>>>
>>>>> Regards,
>>>>> Shenghua (Daniel) Wan
>>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> Regards,
>> Shenghua (Daniel) Wan
>>
>

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by Marcin Kuthan <ma...@gmail.com>.
As long as Kafka producent is thread-safe you don't need any pool at all.
Just share single producer on every executor. Please look at my blog post
for more details. http://allegro.tech/spark-kafka-integration.html
19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wa...@gmail.com>
napisał(a):

> All of you are right.
>
> I was trying to create too many producers. My idea was to create a
> pool(for now the pool contains only one producer) shared by all the
> executors.
> After I realized it was related to the serializable issues (though I did
> not find clear clues in the source code to indicate the broacast template
> type parameter must be implement serializable),  I followed spark cassandra
> connector design and created a singleton of Kafka producer pools. There is
> not exception noticed.
>
> Thanks for all your comments.
>
>
> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <td...@databricks.com>
> wrote:
>
>> Why are you even trying to broadcast a producer? A broadcast variable is
>> some immutable piece of serializable DATA that can be used for processing
>> on the executors. A Kafka producer is neither DATA nor immutable, and
>> definitely not serializable.
>> The right way to do this is to create the producer in the executors.
>> Please see the discussion in the programming guide
>>
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>
>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> I wouldn't expect a kafka producer to be serializable at all... among
>>> other things, it has a background thread
>>>
>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>>> wanshenghua@gmail.com> wrote:
>>>
>>>> Hi,
>>>> Did anyone see java.util.ConcurrentModificationException when using
>>>> broadcast variables?
>>>> I encountered this exception when wrapping a Kafka producer like this
>>>> in the spark streaming driver.
>>>>
>>>> Here is what I did.
>>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>>> String>(properties);
>>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>>     = streamingContext.sparkContext().broadcast(producer);
>>>>
>>>> Then within an closure called by a foreachRDD, I was trying to get the
>>>> wrapped producer, i.e.
>>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>>
>>>> after rebuilding and rerunning, I got the stack trace like this
>>>>
>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>>> java.util.ConcurrentModificationException
>>>> Serialization trace:
>>>> classes (sun.misc.Launcher$AppClassLoader)
>>>> classloader (java.security.ProtectionDomain)
>>>> context (java.security.AccessControlContext)
>>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>>> producer ("my driver")
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>> at
>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>> at
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>>> at
>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>>> at "my driver"
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>> Caused by: java.util.ConcurrentModificationException
>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> ... 41 more
>>>>
>>>> ​Thanks.​
>>>>
>>>> --
>>>>
>>>> Regards,
>>>> Shenghua (Daniel) Wan
>>>>
>>>
>>>
>>
>
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by "Shenghua(Daniel) Wan" <wa...@gmail.com>.
All of you are right.

I was trying to create too many producers. My idea was to create a pool(for
now the pool contains only one producer) shared by all the executors.
After I realized it was related to the serializable issues (though I did
not find clear clues in the source code to indicate the broacast template
type parameter must be implement serializable),  I followed spark cassandra
connector design and created a singleton of Kafka producer pools. There is
not exception noticed.

Thanks for all your comments.


On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <td...@databricks.com> wrote:

> Why are you even trying to broadcast a producer? A broadcast variable is
> some immutable piece of serializable DATA that can be used for processing
> on the executors. A Kafka producer is neither DATA nor immutable, and
> definitely not serializable.
> The right way to do this is to create the producer in the executors.
> Please see the discussion in the programming guide
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> I wouldn't expect a kafka producer to be serializable at all... among
>> other things, it has a background thread
>>
>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>> wanshenghua@gmail.com> wrote:
>>
>>> Hi,
>>> Did anyone see java.util.ConcurrentModificationException when using
>>> broadcast variables?
>>> I encountered this exception when wrapping a Kafka producer like this in
>>> the spark streaming driver.
>>>
>>> Here is what I did.
>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>> String>(properties);
>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>     = streamingContext.sparkContext().broadcast(producer);
>>>
>>> Then within an closure called by a foreachRDD, I was trying to get the
>>> wrapped producer, i.e.
>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>
>>> after rebuilding and rerunning, I got the stack trace like this
>>>
>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>> java.util.ConcurrentModificationException
>>> Serialization trace:
>>> classes (sun.misc.Launcher$AppClassLoader)
>>> classloader (java.security.ProtectionDomain)
>>> context (java.security.AccessControlContext)
>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>> producer ("my driver")
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>> at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>> at
>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>> at
>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>> at
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>> at
>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>> at
>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>> at "my driver"
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> Caused by: java.util.ConcurrentModificationException
>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>> at
>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>> at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>> ... 41 more
>>>
>>> ​Thanks.​
>>>
>>> --
>>>
>>> Regards,
>>> Shenghua (Daniel) Wan
>>>
>>
>>
>


-- 

Regards,
Shenghua (Daniel) Wan

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by Tathagata Das <td...@databricks.com>.
Why are you even trying to broadcast a producer? A broadcast variable is
some immutable piece of serializable DATA that can be used for processing
on the executors. A Kafka producer is neither DATA nor immutable, and
definitely not serializable.
The right way to do this is to create the producer in the executors. Please
see the discussion in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <co...@koeninger.org> wrote:

> I wouldn't expect a kafka producer to be serializable at all... among
> other things, it has a background thread
>
> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
> wanshenghua@gmail.com> wrote:
>
>> Hi,
>> Did anyone see java.util.ConcurrentModificationException when using
>> broadcast variables?
>> I encountered this exception when wrapping a Kafka producer like this in
>> the spark streaming driver.
>>
>> Here is what I did.
>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>> String>(properties);
>> final Broadcast<KafkaDataProducer> bCastProducer
>>     = streamingContext.sparkContext().broadcast(producer);
>>
>> Then within an closure called by a foreachRDD, I was trying to get the
>> wrapped producer, i.e.
>>  KafkaProducer<String, String> p = bCastProducer.value();
>>
>> after rebuilding and rerunning, I got the stack trace like this
>>
>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>> java.util.ConcurrentModificationException
>> Serialization trace:
>> classes (sun.misc.Launcher$AppClassLoader)
>> classloader (java.security.ProtectionDomain)
>> context (java.security.AccessControlContext)
>> acc (org.apache.spark.util.MutableURLClassLoader)
>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>> producer ("my driver")
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>> at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>> at
>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>> at
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>> at
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>> at
>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>> at "my driver"
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.util.ConcurrentModificationException
>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>> at java.util.Vector$Itr.next(Vector.java:1133)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>> at
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> at
>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> ... 41 more
>>
>> ​Thanks.​
>>
>> --
>>
>> Regards,
>> Shenghua (Daniel) Wan
>>
>
>

Re: broadcast variable of Kafka producer throws ConcurrentModificationException

Posted by Cody Koeninger <co...@koeninger.org>.
I wouldn't expect a kafka producer to be serializable at all... among other
things, it has a background thread

On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <wanshenghua@gmail.com
> wrote:

> Hi,
> Did anyone see java.util.ConcurrentModificationException when using
> broadcast variables?
> I encountered this exception when wrapping a Kafka producer like this in
> the spark streaming driver.
>
> Here is what I did.
> KafkaProducer<String, String> producer = new KafkaProducer<String,
> String>(properties);
> final Broadcast<KafkaDataProducer> bCastProducer
>     = streamingContext.sparkContext().broadcast(producer);
>
> Then within an closure called by a foreachRDD, I was trying to get the
> wrapped producer, i.e.
>  KafkaProducer<String, String> p = bCastProducer.value();
>
> after rebuilding and rerunning, I got the stack trace like this
>
> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> context (java.security.AccessControlContext)
> acc (org.apache.spark.util.MutableURLClassLoader)
> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
> producer ("my driver")
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
> at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
> at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
> at
> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
> at
> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
> at "my driver"
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.util.ConcurrentModificationException
> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
> at java.util.Vector$Itr.next(Vector.java:1133)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
> ... 41 more
>
> ​Thanks.​
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>