You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Nam-Luc Tran <na...@euranova.eu> on 2015/02/11 15:02:17 UTC

kryoException : Buffer underflow

Hello,

I came accross an error for which I am unable to retrace the exact cause.
Starting from flink-java-examples module, I have extended the KMeans example
to a case where points have 25 coordinates. It follows the exact same
structure and transformations as the original example, only with points
having 25 coordinates instead of 2.

When creating the centroids dataset within the code as follows the job
iterates and executes well:

Centroid25 cent1 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
1000),
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
Centroid25 cent2 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
1000),
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
DataSet<Centroid25> centroids = env.fromCollection(Arrays.asList(cent1,
cent2));

When reading from a csv file containing the following:
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0

with the following code:
DataSet<Centroid25>> centroids = env
				.readCsvFile("file:///home/nltran/res3.csv")
				.fieldDelimiter(",")
				.includeFields("1111111111111111111111111")
				.types(Double.class, Double.class, Double.class, Double.class,
Double.class, Double.class,
						Double.class, Double.class, Double.class, Double.class, Double.class,
Double.class,
						Double.class, Double.class, Double.class, Double.class, Double.class,
Double.class,
						Double.class, Double.class, Double.class, Double.class, Double.class,
Double.class,
						Double.class).map(p -> {
					return new Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
				}).returns("eu.euranova.flink.Centroid25");


I hit the following exception:

02/11/2015 14:58:27	PartialSolution (BulkIteration (Bulk Iteration))(1/1)
switched to FAILED 
com.esotericsoftware.kryo.KryoException: Buffer underflow
	at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
	at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
	at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
	at
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
	at
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
	at
org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
	at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
	at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
	at
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
	at
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
	at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
	at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
	at java.lang.Thread.run(Thread.java:745)

02/11/2015 14:58:27	Job execution switched to status FAILING.
02/11/2015 14:58:27	CHAIN Map (Map at main(DoTheKMeans.java:64)) -> Map (Map
at main(DoTheKMeans.java:65))(1/1) switched to CANCELING 
02/11/2015 14:58:27	Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
switched to CANCELING 
02/11/2015 14:58:27	CHAIN Reduce(Reduce at main(DoTheKMeans.java:68)) -> Map
(Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING 
02/11/2015 14:58:27	DataSink(Print to System.out)(1/1) switched to CANCELED 
02/11/2015 14:58:27	Sync(BulkIteration (Bulk Iteration))(1/1) switched to
CANCELING 
02/11/2015 14:58:27	Sync(BulkIteration (Bulk Iteration))(1/1) switched to
CANCELED 
02/11/2015 14:58:27	CHAIN Map (Map at main(DoTheKMeans.java:64)) -> Map (Map
at main(DoTheKMeans.java:65))(1/1) switched to CANCELED 
02/11/2015 14:58:27	Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
switched to CANCELED 
02/11/2015 14:58:27	CHAIN Reduce(Reduce at main(DoTheKMeans.java:68)) -> Map
(Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED 
02/11/2015 14:58:27	Job execution switched to status FAILED.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
com.esotericsoftware.kryo.KryoException: Buffer underflow
	at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
	at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
	at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
	at
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
	at
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
	at
org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
	at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
	at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
	at
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
	at
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
	at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
	at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
	at java.lang.Thread.run(Thread.java:745)

	at
org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
	at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
	at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at
org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
	at akka.dispatch.Mailbox.run(Mailbox.scala:221)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The centroid25 data is exactly the same in both cases. Could you help me
retrace what is wrong?

Thanks and best regards,

Tran Nam-Luc



--
View this message in context: http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
Sent from the Apache Flink (Incubator) Mailing List archive. mailing list archive at Nabble.com.

Re: kryoException : Buffer underflow

Posted by Stephan Ewen <se...@apache.org>.
Hi Tran Nam-Luc!

That is a problem we will look into.

In the meantime, can you try to modify your object such that it is a "Flink
POJO"? Then we will serialize it ourselves, without involving Kryo. To do
that, make sure that
 - The class is public
 - It has a public null-argument constructor
 - All fields are wither public, or have public getters and setters

Here are some minor pointers for the program:
 - If you include all CSV fields, you need not have the "
.includeFields("1111111111111111111111111")"
function call. The "includeFields" function is only necessary if you want
to skip over some fields.
 - If the lambda map function returns a simple class without generic
parameters, you do not need the 'returns("eu.euranova.flink.Centroid25")'
call. It should work even without.

Greetings,
Stephan




On Wed, Feb 11, 2015 at 3:02 PM, Nam-Luc Tran <na...@euranova.eu>
wrote:

> Hello,
>
> I came accross an error for which I am unable to retrace the exact cause.
> Starting from flink-java-examples module, I have extended the KMeans
> example
> to a case where points have 25 coordinates. It follows the exact same
> structure and transformations as the original example, only with points
> having 25 coordinates instead of 2.
>
> When creating the centroids dataset within the code as follows the job
> iterates and executes well:
>
> Centroid25 cent1 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> Centroid25 cent2 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> DataSet<Centroid25> centroids = env.fromCollection(Arrays.asList(cent1,
> cent2));
>
> When reading from a csv file containing the following:
>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>
> with the following code:
> DataSet<Centroid25>> centroids = env
>
> .readCsvFile("file:///home/nltran/res3.csv")
>                                 .fieldDelimiter(",")
>                                 .includeFields("1111111111111111111111111")
>                                 .types(Double.class, Double.class,
> Double.class, Double.class,
> Double.class, Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class).map(p -> {
>                                         return new
> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>
> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
>                                 }).returns("eu.euranova.flink.Centroid25");
>
>
> I hit the following exception:
>
> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> Iteration))(1/1)
> switched to FAILED
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
> 02/11/2015 14:58:27     Job execution switched to status FAILING.
> 02/11/2015 14:58:27     CHAIN Map (Map at main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
> switched to CANCELING
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1) switched to
> CANCELED
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk Iteration))(1/1) switched
> to
> CANCELING
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk Iteration))(1/1) switched
> to
> CANCELED
> 02/11/2015 14:58:27     CHAIN Map (Map at main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
> switched to CANCELED
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
>         at
>
> org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
> org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The centroid25 data is exactly the same in both cases. Could you help me
> retrace what is wrong?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>