You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Baronti <f....@list-group.com> on 2015/05/04 14:09:30 UTC

Crash on DataSet.collect()

Hello,

I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but
I obtain the following error on cluster execution (no problem with local
execution), which also causes the job manager to crash:

14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
main(Test01.java:54)) -> F
latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying DataSink
(org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1) (attemp
t #0) to india3
14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
04 14:05:39 CEST 201
5) changed to FINISHED .
14:05:41,284 ERROR akka.actor.OneForOneStrategy
- java.io.StreamCorruptedException: invalid type code: 00
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid type code: 00
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:232)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:268)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:51)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:35)
        at
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
cumulatorResults(AccumulatorManager.java:77)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
es$1.applyOrElse(JobManager.scala:300)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
unction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:25)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:37)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:30)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
es.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
a:91)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:224)
        ... 24 more
14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Stopping JobManager akka://flink/user/jobmanager#-828467473.
14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
process...

Is this a known issue? Am I doing something wrong?

Thanks
Flavio



RE: Crash on DataSet.collect()

Posted by Flavio Baronti <f....@list-group.com>.
Hi Stephan,

 

I confirm that I was using custom types in the collect(), and that the bug is not present in the master.

 

Thanks

Flavio

 

 

From: ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] On Behalf Of Stephan Ewen
Sent: Monday, May 04, 2015 2:33 PM
To: user@flink.apache.org
Subject: Re: Crash on DataSet.collect()

 

Hi Flavio!

 

This issue is known and has been fixed already. It occurs when you use custom types in collect, because it uses the wrong classloader/serializer to transfer them.

 

The current master should not have this issue any more.

 

Greetings,
Stephan

 

 

On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <f.baronti@list-group.com <ma...@list-group.com> > wrote:

Hello,

I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but
I obtain the following error on cluster execution (no problem with local
execution), which also causes the job manager to crash:

14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
main(Test01.java:54)) -> F
latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying DataSink
(org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16 <ma...@4386f16> ) (1/1) (attemp
t #0) to india3
14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
04 14:05:39 CEST 201
5) changed to FINISHED .
14:05:41,284 ERROR akka.actor.OneForOneStrategy
- java.io.StreamCorruptedException: invalid type code: 00
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid type code: 00
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:232)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:268)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:51)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:35)
        at
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
cumulatorResults(AccumulatorManager.java:77)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
es$1.applyOrElse(JobManager.scala:300)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
unction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:25)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:37)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:30)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
es.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
a:91)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:224)
        ... 24 more
14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Stopping JobManager akka://flink/user/jobmanager#-828467473.
14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
process...

Is this a known issue? Am I doing something wrong?

Thanks
Flavio



 


Re: Crash on DataSet.collect()

Posted by Stephan Ewen <se...@apache.org>.
Hi Flavio!

This issue is known and has been fixed already. It occurs when you use
custom types in collect, because it uses the wrong classloader/serializer
to transfer them.

The current master should not have this issue any more.

Greetings,
Stephan


On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <f....@list-group.com>
wrote:

> Hello,
>
> I'm testing the new DataSet.collect() method on version 0.9-milestone-1,
> but
> I obtain the following error on cluster execution (no problem with local
> execution), which also causes the job manager to crash:
>
> 14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
> main(Test01.java:54)) -> F
> latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
> 14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
> - Deploying DataSink
> (org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16) (1/1)
> (attemp
> t #0) to india3
> 14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
> - Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
> 04 14:05:39 CEST 201
> 5) changed to FINISHED .
> 14:05:41,284 ERROR akka.actor.OneForOneStrategy
> - java.io.StreamCorruptedException: invalid type code: 00
> org.apache.commons.lang3.SerializationException:
> java.io.StreamCorruptedException: invalid type code: 00
>         at
>
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
> ava:232)
>         at
>
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
> ava:268)
>         at
>
> org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
> ccumulator.java:51)
>         at
>
> org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
> ccumulator.java:35)
>         at
>
> org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
> cumulatorResults(AccumulatorManager.java:77)
>         at
>
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
> es$1.applyOrElse(JobManager.scala:300)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
> unction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
> scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
> scala:25)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
> la:37)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
> la:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
> es.scala:30)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
> a:91)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
> .java:1253)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
> 346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
> :107)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
>         at
> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>         at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>         at
>
> org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
> ava:224)
>         ... 24 more
> 14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
> - Stopping JobManager akka://flink/user/jobmanager#-828467473.
> 14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
> - Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
> process...
>
> Is this a known issue? Am I doing something wrong?
>
> Thanks
> Flavio
>
>
>