You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Kaepke, Marc" <ma...@haw-hamburg.de> on 2017/05/10 14:16:59 UTC

Gelly - generics with custom vertex value

Hi,

a part of my bachelor thesis is an implementation of the Semi-Clustering algorithm [1].
I’m using the Scatter-Gather-Iteration. Each vertex has to know its neighbors and the edge-value between of that. Because Gelly’s vertex doesn’t provide both information, I wrote an CustomVertexValue class.  An object contains a set of edges and a list of another custom class called SemiCluster, which contains a list of vertex and a double value.
Now I’m able to create vertices like Vertex<Double, CustomVertexValue>.

Unfortunately I get an exception if I run my Scatter-Gather-Iteration.

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
at java.util.ArrayList$SubList.size(ArrayList.java:1040)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Process finished with exit code 1

———————————

Maybe Gelly isn’t able to handle my CustomVertexValue!?

The ScatterFunctions works fine.


Thanks a lot
Best,
Marc


[1] http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/sigmod10/p135-malewicz.pdf chapter 5.4 (page 141)



Re: Gelly - generics with custom vertex value

Posted by "Kaepke, Marc" <ma...@haw-hamburg.de>.
Thanks for the hint.

I focused on it and get a strange behavior.
If I change from EdgeDirection.ALL (what I need) to EdgeDirection.OUT (or .IN), everything seems okey. The sublist operation was still active.

Then I replaced the sublist with the entire list and there was no exception (EdgeDirection.All/IN/OUT worked). In case of .ALL the algorithm ran until „maximumNumberOfIterations“. How can I limit it and how can I set the termination conditions?


Best,
Marc

Am 10.05.2017 um 18:18 schrieb Stephan Ewen <se...@apache.org>>:

Looks like java.util.ArrayList$SubList does not work out of the box with Kryo / Flink.

Try registering a custom serializer for it...

On Wed, May 10, 2017 at 4:16 PM, Kaepke, Marc <ma...@haw-hamburg.de>> wrote:
Hi,

a part of my bachelor thesis is an implementation of the Semi-Clustering algorithm [1].
I’m using the Scatter-Gather-Iteration. Each vertex has to know its neighbors and the edge-value between of that. Because Gelly’s vertex doesn’t provide both information, I wrote an CustomVertexValue class.  An object contains a set of edges and a list of another custom class called SemiCluster, which contains a list of vertex and a double value.
Now I’m able to create vertices like Vertex<Double, CustomVertexValue>.

Unfortunately I get an exception if I run my Scatter-Gather-Iteration.

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:900)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:203)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
at java.util.ArrayList$SubList.size(ArrayList.java:1040)
at java.util.AbstractList.add(AbstractList.java:108)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:246)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io/>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io/>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io/>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

Process finished with exit code 1

———————————

Maybe Gelly isn’t able to handle my CustomVertexValue!?

The ScatterFunctions works fine.


Thanks a lot
Best,
Marc


[1] http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/sigmod10/p135-malewicz.pdf chapter 5.4 (page 141)





Re: Gelly - generics with custom vertex value

Posted by Stephan Ewen <se...@apache.org>.
Looks like java.util.ArrayList$SubList does not work out of the box with
Kryo / Flink.

Try registering a custom serializer for it...

On Wed, May 10, 2017 at 4:16 PM, Kaepke, Marc <ma...@haw-hamburg.de>
wrote:

> Hi,
>
> a part of my bachelor thesis is an implementation of the Semi-Clustering
> algorithm [1].
> I’m using the Scatter-Gather-Iteration. Each vertex has to know its
> neighbors and the edge-value between of that. Because Gelly’s vertex
> doesn’t provide both information, I wrote an CustomVertexValue class.  An
> object contains a set of edges and a list of another custom class called
> SemiCluster, which contains a list of vertex and a double value.
> Now I’m able to create vertices like Vertex<Double, CustomVertexValue>.
>
> Unfortunately I get an exception if I run my Scatter-Gather-Iteration.
>
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException:
> Job execution failed.
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$
> mcV$sp(JobManager.scala:900)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:843)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDr
> iver.run(CoGroupWithSolutionSetSecondDriver.java:203)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.lang.NullPointerException
> at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1230)
> at java.util.ArrayList$SubList.size(ArrayList.java:1040)
> at java.util.AbstractList.add(AbstractList.java:108)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:116)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:246)
> at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.deserialize(TupleSerializer.java:144)
> at org.apache.flink.api.java.typeutils.runtime.
> TupleSerializer.deserialize(TupleSerializer.java:30)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:109)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:973)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> Process finished with exit code 1
>
> ———————————
>
> Maybe Gelly isn’t able to handle my CustomVertexValue!?
>
> The ScatterFunctions works fine.
>
>
> Thanks a lot
> Best,
> Marc
>
>
> [1] http://www.dcs.bbk.ac.uk/~dell/teaching/cc/paper/
> sigmod10/p135-malewicz.pdf chapter 5.4 (page 141)
>
>
>