You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Biplob Biswas <re...@gmail.com> on 2016/02/13 18:55:51 UTC

Regarding Concurrent Modification Exception

Hi,
We are getting a ConcurrentModificationException, the complete stack trace
is as follows:

org.apache.flink.optimizer.CompilerException: Error translating node 'Data
> Source "at compute(ArpackSVD.java:367)
> (org.apache.flink.api.java.io.CollectionInputFormat)" : NONE [[
> GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties
> [ordering=null, grouped=null, unique=null] ]]':
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classLoader (akka.actor.ReflectiveDynamicAccess)
> _pm (akka.actor.ActorSystemImpl)
> actorSystem (org.apache.flink.client.program.Client)
> client (org.apache.flink.client.program.ContextEnvironment)
> context (org.apache.flink.api.java.operators.MapOperator)
> matrix (flink.pca.impl.svd.ArpackSVD)
> this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
> at
> org.apache.flink.optimizer.plan.SourcePlanNode.accept(SourcePlanNode.java:86)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:166)
> at org.apache.flink.client.program.Client.getJobGraph(Client.java:534)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:347)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:804)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
> at flink.pca.impl.svd.ArpackSVD.compute(ArpackSVD.java:379)
> at flink.pca.impl.PCA.computeSVD(PCA.java:110)
> at flink.pca.impl.PCA.project(PCA.java:35)
> at flink.pca.impl.PCASystemTest.runFlinkJob(PCASystemTest.java:90)
> at flink.pca.impl.PCASystemTest.main(PCASystemTest.java:61)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.util.ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classLoader (akka.actor.ReflectiveDynamicAccess)
> _pm (akka.actor.ActorSystemImpl)
> actorSystem (org.apache.flink.client.program.Client)
> client (org.apache.flink.client.program.ContextEnvironment)
> context (org.apache.flink.api.java.operators.MapOperator)
> matrix (flink.pca.impl.svd.ArpackSVD)
> this$0 (flink.pca.impl.svd.ArpackSVD$ArpackContext)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:193)
> at
> org.apache.flink.api.java.io.CollectionInputFormat.writeObject(CollectionInputFormat.java:88)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:307)
> at
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:259)
> at
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createDataSourceVertex(JobGraphGenerator.java:890)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:286)
> ... 30 more
> Caused by: java.util.ConcurrentModificationException
> at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
> at java.util.Vector$Itr.next(Vector.java:1137)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ... 73 more



Can anyone enlighten us as why is it like this or how to fix this issue? We
did a bit of google search, but all we get is some problem with serializing
broadcast variable. We use flink bulk iterations and this variable is
broadcasted to both map and reduce in one dataflow!

Thanks & Regards
Biplob Biswas

Re: Regarding Concurrent Modification Exception

Posted by Maximilian Michels <mx...@apache.org>.
HI Biplob,

Could you please supply some sample code? Otherwise it is tough to
debug this problem.

Cheers,
Max

On Tue, Feb 16, 2016 at 2:46 PM, Biplob Biswas <re...@gmail.com> wrote:
> Hi,
>
> No, we don't start a flink job inside another job, although the job creation
> was done in a loop, but only when one job is finished the next job started
> after cleanup. And we didn't get this exception on my local flink
> installation, it appears when i run on the cluster.
>
> Thanks & Regards
> Biplob Biswas
>
> On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>> Hi,
>>
>> This stacktrace looks really suspicious.
>> It includes classes from the submission client (CLIClient), optimizer
>> (JobGraphGenerator), and runtime (KryoSerializer).
>>
>> Is it possible that you try to start a new Flink job inside another job?
>> This would not work.
>>
>> Best, Fabian
>
>

Re: Regarding Concurrent Modification Exception

Posted by Biplob Biswas <re...@gmail.com>.
Hi,

No, we don't start a flink job inside another job, although the job
creation was done in a loop, but only when one job is finished the next job
started after cleanup. And we didn't get this exception on my local flink
installation, it appears when i run on the cluster.

Thanks & Regards
Biplob Biswas

On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> This stacktrace looks really suspicious.
> It includes classes from the submission client (CLIClient), optimizer
> (JobGraphGenerator), and runtime (KryoSerializer).
>
> Is it possible that you try to start a new Flink job inside another job?
> This would not work.
>
> Best, Fabian
>

Re: Regarding Concurrent Modification Exception

Posted by Till Rohrmann <tr...@apache.org>.
But isn't that a normal stack trace which you see when you submit a job to
the cluster via the CLI and somewhere in the compilation process something
fails?

Anyway, it would be helpful to see the program which causes this problem.

Cheers,
Till

On Mon, Feb 15, 2016 at 12:25 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi,
>
> This stacktrace looks really suspicious.
> It includes classes from the submission client (CLIClient), optimizer
> (JobGraphGenerator), and runtime (KryoSerializer).
>
> Is it possible that you try to start a new Flink job inside another job?
> This would not work.
>
> Best, Fabian
>

Re: Regarding Concurrent Modification Exception

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

This stacktrace looks really suspicious.
It includes classes from the submission client (CLIClient), optimizer
(JobGraphGenerator), and runtime (KryoSerializer).

Is it possible that you try to start a new Flink job inside another job?
This would not work.

Best, Fabian