You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Theodore Vasiloudis <th...@gmail.com> on 2016/01/20 11:24:19 UTC

Unexpected out of bounds error in UnilateralSortMerger

Hello all,

I'm trying to run a job using FlinkML and I'm confused about the source of
an error.

The job reads a libSVM formatted file and trains an SVM classifier on it.

I've tried this with small datasets and everything works out fine.

When trying to run the same job on a large dataset (~11GB uncompressed)
however, I get the following error:


> java.lang.RuntimeException: Error obtaining the sorted input: Thread
> 'SortMerger spilling thread' terminated due to an exception:
> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
> Serialization trace:
> indices (org.apache.flink.ml.math.SparseVector)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
> 14, Size: 2
> Serialization trace:
> indices (org.apache.flink.ml.math.SparseVector)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
> Serialization trace:
> indices (org.apache.flink.ml.math.SparseVector)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>         at java.util.ArrayList.set(ArrayList.java:444)
>         at
> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>         at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>         ... 10 more



Any idea what might be causing this? I'm running the job in local mode, 1
TM with 8 slots and ~32GB heap size.

All the vectors created by the libSVM loader have the correct size.

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Theodore Vasiloudis <th...@gmail.com>.
And this is the one from running with a CSV input, this time I've verified
that I'm using the correct version of Flink, according to Till's
instructions:

 The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>         at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
>         at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>         at fosdem.SVMClassification$.main(SVMClassification.scala:128)
>         at fosdem.SVMClassification.main(SVMClassification.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception:
> scala.collection.immutable.Map$EmptyMap$ cannot be cast to
> org.apache.flink.ml.math.Vector
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: scala.collection.immutable.Map$EmptyMap$
> cannot be cast to org.apache.flink.ml.math.Vector
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.lang.ClassCastException:
> scala.collection.immutable.Map$EmptyMap$ cannot be cast to
> org.apache.flink.ml.math.Vector
>         at
> org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
>         at
> org.apache.flink.ml.classification.SVM$$anon$25$$anon$11$$anon$1.createInstance(SVM.scala:353)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:114)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:111)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:104)
>         at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:28)
>         at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>         at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>

On Thu, Jan 21, 2016 at 10:51 AM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> This is the stack trace from running with the patched branch:
>
>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Job execution failed.
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>         at
>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>         at
>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>>         at
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
>>         at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>>         at fosdem.SVMClassification$.main(SVMClassification.scala:114)
>>         at fosdem.SVMClassification.main(SVMClassification.scala)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>         at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
>>         at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> execution failed.
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>>         at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>>         at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>         at
>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>         at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>>         at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>>         at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>         at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
>>         at
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.ArrayIndexOutOfBoundsException
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>>         at
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException
>>         at
>> org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128)
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>>         at
>> com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266)
>>         at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
>>         at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63)
>>         at
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52)
>>         at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>>         ... 9 more
>>
>
> On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Can you again post the stack trace? With the patched branch, the
>> reference mapper should not be used any more (which is where the original
>> exception occurred).
>>
>> On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> Alright I will try to do that.
>>>
>>> I've tried running the job with a CSV file as input, and using
>>> DenseVectors to represent the features, still the same IndexOutOfBounds
>>> error.
>>>
>>> On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> You could change the version of Stephan’s branch via mvn versions:set
>>>> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now
>>>> after you install the Flink binaries you can reference them in your project
>>>> by setting the version of your Flink dependencies to
>>>> MyCustomBuildVersion. That way, you are sure that the right
>>>> dependencies are used.
>>>>
>>>> Alternatively, you could compile an example program with example input
>>>> data which can reproduce the problem. Then I could also take a look at it.
>>>>
>>>> Cheers,
>>>> Till
>>>> ​
>>>>
>>>> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>
>>>>> OK here's what I tried:
>>>>>
>>>>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>>>>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>>>>> added local maven repo to resolvers so that it picks up the previously
>>>>> installed version (I hope)
>>>>> * Launch local cluster from newly built Flink, try to run job
>>>>>
>>>>> Still getting the same error.
>>>>>
>>>>> Is there a way to ensure that SBT is picking up the local version of
>>>>> Flink to build the uber-jar?
>>>>> Does it matter in this case, or is it enough that I'm sure the
>>>>> launched Flink instance comes from the branch you linked?
>>>>>
>>>>>
>>>>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> The bug looks to be in the serialization via Kryo while spilling
>>>>>> windows. Note that Kryo is here used as a fallback serializer, since the
>>>>>> SparseVector is not transparent type to Flink.
>>>>>>
>>>>>> I think there are two possible reasons:
>>>>>>   1) Kryo, or our Kryo setup has an issue here
>>>>>>   2) Kryo is inconsistently configured. There are multiple Kryo
>>>>>> instances used across the serializers in the sorter. There may be a bug
>>>>>> that they are not initialized in sync.
>>>>>>
>>>>>>
>>>>>> To check this, can you build Flink with this pull request (
>>>>>> https://github.com/apache/flink/pull/1528) or from this branch (
>>>>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>>>>>> fixes it?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>
>>>>>>> I haven't been able to reproduce this with other datasets. Taking a
>>>>>>> smaller sample from the large dataset I'm using (link to data
>>>>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>>>>>> causes the same problem however.
>>>>>>>
>>>>>>> I'm wondering if the implementation of readLibSVM is what's wrong
>>>>>>> here. I've tried the new version commited recently by Chiwan, but I still
>>>>>>> get the same error.
>>>>>>>
>>>>>>> I'll see if I can spot a bug in readLibSVM.
>>>>>>>
>>>>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>>
>>>>>>>> It's on 0.10.
>>>>>>>>
>>>>>>>> I've tried explicitly registering SparseVector (which is done
>>>>>>>> anyway by registerFlinkMLTypes
>>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>>>>>> which is called when the SVM predict or evaluate functions are
>>>>>>>> called
>>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>>>>>> in my job but I still get the same. I will try a couple different datasets
>>>>>>>> and try to see if it's the number of features that is causing this or
>>>>>>>> something else.
>>>>>>>>
>>>>>>>> So far it works fine for a dataset with 8 features, but the large
>>>>>>>> one has 2000 and I get the above error there. I will try large datasets
>>>>>>>> with a few features and small datasets with many features as well.
>>>>>>>>
>>>>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi!
>>>>>>>>>
>>>>>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>>>>>
>>>>>>>>> It is probably an incorrectly configured Kryo instance (not a
>>>>>>>>> problem of the sorter).
>>>>>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>>>>>> there should be no reference resolution during serialization /
>>>>>>>>> deserialization.
>>>>>>>>>
>>>>>>>>> Can you try what happens when you explicitly register the type
>>>>>>>>> SparseVector at the ExecutionEnvironment?
>>>>>>>>>
>>>>>>>>> Stephan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello all,
>>>>>>>>>>
>>>>>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>>>>>> source of an error.
>>>>>>>>>>
>>>>>>>>>> The job reads a libSVM formatted file and trains an SVM
>>>>>>>>>> classifier on it.
>>>>>>>>>>
>>>>>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>>>>>
>>>>>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>>>>>> uncompressed) however, I get the following error:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input:
>>>>>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception:
>>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling
>>>>>>>>>>> thread' terminated due to an exception:
>>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>> Serialization trace:
>>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size:
>>>>>>>>>>> 2
>>>>>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>>>>>         at
>>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>>>>>         ... 10 more
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>>>>>
>>>>>>>>>> All the vectors created by the libSVM loader have the correct
>>>>>>>>>> size.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Theodore Vasiloudis <th...@gmail.com>.
This is the stack trace from running with the patched branch:

 The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>         at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>         at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:803)
>         at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:591)
>         at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544)
>         at fosdem.SVMClassification$.main(SVMClassification.scala:114)
>         at fosdem.SVMClassification.main(SVMClassification.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:796)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:323)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1112)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1160)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:659)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:605)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> indices (org.apache.flink.ml.math.SparseVector)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1085)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> indices (org.apache.flink.ml.math.SparseVector)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> indices (org.apache.flink.ml.math.SparseVector)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>         at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException
>         at
> org.apache.flink.core.memory.HeapMemorySegment.put(HeapMemorySegment.java:128)
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:195)
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>         at com.esotericsoftware.kryo.io.Output.writeVarInt(Output.java:266)
>         at com.esotericsoftware.kryo.io.Output.writeInt(Output.java:251)
>         at com.esotericsoftware.kryo.io.Output.writeInts(Output.java:669)
>         at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:63)
>         at
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$IntArraySerializer.write(DefaultArraySerializers.java:52)
>         at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>         at
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>         ... 9 more
>

On Wed, Jan 20, 2016 at 9:45 PM, Stephan Ewen <se...@apache.org> wrote:

> Can you again post the stack trace? With the patched branch, the reference
> mapper should not be used any more (which is where the original exception
> occurred).
>
> On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> Alright I will try to do that.
>>
>> I've tried running the job with a CSV file as input, and using
>> DenseVectors to represent the features, still the same IndexOutOfBounds
>> error.
>>
>> On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> You could change the version of Stephan’s branch via mvn versions:set
>>> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now
>>> after you install the Flink binaries you can reference them in your project
>>> by setting the version of your Flink dependencies to
>>> MyCustomBuildVersion. That way, you are sure that the right
>>> dependencies are used.
>>>
>>> Alternatively, you could compile an example program with example input
>>> data which can reproduce the problem. Then I could also take a look at it.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
>>> theodoros.vasiloudis@gmail.com> wrote:
>>>
>>>> OK here's what I tried:
>>>>
>>>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>>>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>>>> added local maven repo to resolvers so that it picks up the previously
>>>> installed version (I hope)
>>>> * Launch local cluster from newly built Flink, try to run job
>>>>
>>>> Still getting the same error.
>>>>
>>>> Is there a way to ensure that SBT is picking up the local version of
>>>> Flink to build the uber-jar?
>>>> Does it matter in this case, or is it enough that I'm sure the launched
>>>> Flink instance comes from the branch you linked?
>>>>
>>>>
>>>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> The bug looks to be in the serialization via Kryo while spilling
>>>>> windows. Note that Kryo is here used as a fallback serializer, since the
>>>>> SparseVector is not transparent type to Flink.
>>>>>
>>>>> I think there are two possible reasons:
>>>>>   1) Kryo, or our Kryo setup has an issue here
>>>>>   2) Kryo is inconsistently configured. There are multiple Kryo
>>>>> instances used across the serializers in the sorter. There may be a bug
>>>>> that they are not initialized in sync.
>>>>>
>>>>>
>>>>> To check this, can you build Flink with this pull request (
>>>>> https://github.com/apache/flink/pull/1528) or from this branch (
>>>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>>>>> fixes it?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Stephan
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>
>>>>>> I haven't been able to reproduce this with other datasets. Taking a
>>>>>> smaller sample from the large dataset I'm using (link to data
>>>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>>>>> causes the same problem however.
>>>>>>
>>>>>> I'm wondering if the implementation of readLibSVM is what's wrong
>>>>>> here. I've tried the new version commited recently by Chiwan, but I still
>>>>>> get the same error.
>>>>>>
>>>>>> I'll see if I can spot a bug in readLibSVM.
>>>>>>
>>>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>
>>>>>>> It's on 0.10.
>>>>>>>
>>>>>>> I've tried explicitly registering SparseVector (which is done anyway
>>>>>>> by registerFlinkMLTypes
>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>>>>> which is called when the SVM predict or evaluate functions are
>>>>>>> called
>>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>>>>> in my job but I still get the same. I will try a couple different datasets
>>>>>>> and try to see if it's the number of features that is causing this or
>>>>>>> something else.
>>>>>>>
>>>>>>> So far it works fine for a dataset with 8 features, but the large
>>>>>>> one has 2000 and I get the above error there. I will try large datasets
>>>>>>> with a few features and small datasets with many features as well.
>>>>>>>
>>>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi!
>>>>>>>>
>>>>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>>>>
>>>>>>>> It is probably an incorrectly configured Kryo instance (not a
>>>>>>>> problem of the sorter).
>>>>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>>>>> there should be no reference resolution during serialization /
>>>>>>>> deserialization.
>>>>>>>>
>>>>>>>> Can you try what happens when you explicitly register the type
>>>>>>>> SparseVector at the ExecutionEnvironment?
>>>>>>>>
>>>>>>>> Stephan
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello all,
>>>>>>>>>
>>>>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>>>>> source of an error.
>>>>>>>>>
>>>>>>>>> The job reads a libSVM formatted file and trains an SVM classifier
>>>>>>>>> on it.
>>>>>>>>>
>>>>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>>>>
>>>>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>>>>> uncompressed) however, I get the following error:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input:
>>>>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception:
>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>> Serialization trace:
>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling
>>>>>>>>>> thread' terminated due to an exception:
>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>> Serialization trace:
>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>> Serialization trace:
>>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>>>>         at
>>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>>>>         ... 10 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>>>>
>>>>>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Stephan Ewen <se...@apache.org>.
Can you again post the stack trace? With the patched branch, the reference
mapper should not be used any more (which is where the original exception
occurred).

On Wed, Jan 20, 2016 at 7:38 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Alright I will try to do that.
>
> I've tried running the job with a CSV file as input, and using
> DenseVectors to represent the features, still the same IndexOutOfBounds
> error.
>
> On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
>> You could change the version of Stephan’s branch via mvn versions:set
>> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now
>> after you install the Flink binaries you can reference them in your project
>> by setting the version of your Flink dependencies to MyCustomBuildVersion.
>> That way, you are sure that the right dependencies are used.
>>
>> Alternatively, you could compile an example program with example input
>> data which can reproduce the problem. Then I could also take a look at it.
>>
>> Cheers,
>> Till
>> ​
>>
>> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> OK here's what I tried:
>>>
>>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>>> added local maven repo to resolvers so that it picks up the previously
>>> installed version (I hope)
>>> * Launch local cluster from newly built Flink, try to run job
>>>
>>> Still getting the same error.
>>>
>>> Is there a way to ensure that SBT is picking up the local version of
>>> Flink to build the uber-jar?
>>> Does it matter in this case, or is it enough that I'm sure the launched
>>> Flink instance comes from the branch you linked?
>>>
>>>
>>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> The bug looks to be in the serialization via Kryo while spilling
>>>> windows. Note that Kryo is here used as a fallback serializer, since the
>>>> SparseVector is not transparent type to Flink.
>>>>
>>>> I think there are two possible reasons:
>>>>   1) Kryo, or our Kryo setup has an issue here
>>>>   2) Kryo is inconsistently configured. There are multiple Kryo
>>>> instances used across the serializers in the sorter. There may be a bug
>>>> that they are not initialized in sync.
>>>>
>>>>
>>>> To check this, can you build Flink with this pull request (
>>>> https://github.com/apache/flink/pull/1528) or from this branch (
>>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>>>> fixes it?
>>>>
>>>>
>>>> Thanks,
>>>> Stephan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>
>>>>> I haven't been able to reproduce this with other datasets. Taking a
>>>>> smaller sample from the large dataset I'm using (link to data
>>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>>>> causes the same problem however.
>>>>>
>>>>> I'm wondering if the implementation of readLibSVM is what's wrong
>>>>> here. I've tried the new version commited recently by Chiwan, but I still
>>>>> get the same error.
>>>>>
>>>>> I'll see if I can spot a bug in readLibSVM.
>>>>>
>>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>
>>>>>> It's on 0.10.
>>>>>>
>>>>>> I've tried explicitly registering SparseVector (which is done anyway
>>>>>> by registerFlinkMLTypes
>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>>>> which is called when the SVM predict or evaluate functions are called
>>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>>>> in my job but I still get the same. I will try a couple different datasets
>>>>>> and try to see if it's the number of features that is causing this or
>>>>>> something else.
>>>>>>
>>>>>> So far it works fine for a dataset with 8 features, but the large one
>>>>>> has 2000 and I get the above error there. I will try large datasets with a
>>>>>> few features and small datasets with many features as well.
>>>>>>
>>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi!
>>>>>>>
>>>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>>>
>>>>>>> It is probably an incorrectly configured Kryo instance (not a
>>>>>>> problem of the sorter).
>>>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>>>> there should be no reference resolution during serialization /
>>>>>>> deserialization.
>>>>>>>
>>>>>>> Can you try what happens when you explicitly register the type
>>>>>>> SparseVector at the ExecutionEnvironment?
>>>>>>>
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello all,
>>>>>>>>
>>>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>>>> source of an error.
>>>>>>>>
>>>>>>>> The job reads a libSVM formatted file and trains an SVM classifier
>>>>>>>> on it.
>>>>>>>>
>>>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>>>
>>>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>>>> uncompressed) however, I get the following error:
>>>>>>>>
>>>>>>>>
>>>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input:
>>>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception:
>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>> Serialization trace:
>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling
>>>>>>>>> thread' terminated due to an exception:
>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>> Serialization trace:
>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>> Serialization trace:
>>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>>>         at
>>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>>>         at
>>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>>>         ... 10 more
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>>>
>>>>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Theodore Vasiloudis <th...@gmail.com>.
Alright I will try to do that.

I've tried running the job with a CSV file as input, and using DenseVectors
to represent the features, still the same IndexOutOfBounds error.

On Wed, Jan 20, 2016 at 6:05 PM, Till Rohrmann <tr...@apache.org> wrote:

> You could change the version of Stephan’s branch via mvn versions:set
> -DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after
> you install the Flink binaries you can reference them in your project by
> setting the version of your Flink dependencies to MyCustomBuildVersion.
> That way, you are sure that the right dependencies are used.
>
> Alternatively, you could compile an example program with example input
> data which can reproduce the problem. Then I could also take a look at it.
>
> Cheers,
> Till
> ​
>
> On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> OK here's what I tried:
>>
>> * Build Flink (mvn clean install) from the branch you linked (kryo)
>> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
>> added local maven repo to resolvers so that it picks up the previously
>> installed version (I hope)
>> * Launch local cluster from newly built Flink, try to run job
>>
>> Still getting the same error.
>>
>> Is there a way to ensure that SBT is picking up the local version of
>> Flink to build the uber-jar?
>> Does it matter in this case, or is it enough that I'm sure the launched
>> Flink instance comes from the branch you linked?
>>
>>
>> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> The bug looks to be in the serialization via Kryo while spilling
>>> windows. Note that Kryo is here used as a fallback serializer, since the
>>> SparseVector is not transparent type to Flink.
>>>
>>> I think there are two possible reasons:
>>>   1) Kryo, or our Kryo setup has an issue here
>>>   2) Kryo is inconsistently configured. There are multiple Kryo
>>> instances used across the serializers in the sorter. There may be a bug
>>> that they are not initialized in sync.
>>>
>>>
>>> To check this, can you build Flink with this pull request (
>>> https://github.com/apache/flink/pull/1528) or from this branch (
>>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>>> fixes it?
>>>
>>>
>>> Thanks,
>>> Stephan
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>>> theodoros.vasiloudis@gmail.com> wrote:
>>>
>>>> I haven't been able to reproduce this with other datasets. Taking a
>>>> smaller sample from the large dataset I'm using (link to data
>>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>>> causes the same problem however.
>>>>
>>>> I'm wondering if the implementation of readLibSVM is what's wrong here.
>>>> I've tried the new version commited recently by Chiwan, but I still get the
>>>> same error.
>>>>
>>>> I'll see if I can spot a bug in readLibSVM.
>>>>
>>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>
>>>>> It's on 0.10.
>>>>>
>>>>> I've tried explicitly registering SparseVector (which is done anyway
>>>>> by registerFlinkMLTypes
>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>>> which is called when the SVM predict or evaluate functions are called
>>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>>> in my job but I still get the same. I will try a couple different datasets
>>>>> and try to see if it's the number of features that is causing this or
>>>>> something else.
>>>>>
>>>>> So far it works fine for a dataset with 8 features, but the large one
>>>>> has 2000 and I get the above error there. I will try large datasets with a
>>>>> few features and small datasets with many features as well.
>>>>>
>>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>>
>>>>>> It is probably an incorrectly configured Kryo instance (not a problem
>>>>>> of the sorter).
>>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>>> there should be no reference resolution during serialization /
>>>>>> deserialization.
>>>>>>
>>>>>> Can you try what happens when you explicitly register the type
>>>>>> SparseVector at the ExecutionEnvironment?
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>>
>>>>>>> Hello all,
>>>>>>>
>>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>>> source of an error.
>>>>>>>
>>>>>>> The job reads a libSVM formatted file and trains an SVM classifier
>>>>>>> on it.
>>>>>>>
>>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>>
>>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>>> uncompressed) however, I get the following error:
>>>>>>>
>>>>>>>
>>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input:
>>>>>>>> Thread 'SortMerger spilling thread' terminated due to an exception:
>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>> Serialization trace:
>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>>>>>>> 14, Size: 2
>>>>>>>> Serialization trace:
>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>> Serialization trace:
>>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>>         at
>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>>         at
>>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>>         at
>>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>>         at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>>         at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>>         at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>>         at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>         at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>>         at
>>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>>         at
>>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>>         at
>>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>>         at
>>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>>         ... 10 more
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>>
>>>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Till Rohrmann <tr...@apache.org>.
You could change the version of Stephan’s branch via mvn versions:set
-DnewVersion=MyCustomBuildVersion and then mvn versions:commit. Now after
you install the Flink binaries you can reference them in your project by
setting the version of your Flink dependencies to MyCustomBuildVersion.
That way, you are sure that the right dependencies are used.

Alternatively, you could compile an example program with example input data
which can reproduce the problem. Then I could also take a look at it.

Cheers,
Till
​

On Wed, Jan 20, 2016 at 5:58 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> OK here's what I tried:
>
> * Build Flink (mvn clean install) from the branch you linked (kryo)
> * Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
> added local maven repo to resolvers so that it picks up the previously
> installed version (I hope)
> * Launch local cluster from newly built Flink, try to run job
>
> Still getting the same error.
>
> Is there a way to ensure that SBT is picking up the local version of Flink
> to build the uber-jar?
> Does it matter in this case, or is it enough that I'm sure the launched
> Flink instance comes from the branch you linked?
>
>
> On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> The bug looks to be in the serialization via Kryo while spilling windows.
>> Note that Kryo is here used as a fallback serializer, since the
>> SparseVector is not transparent type to Flink.
>>
>> I think there are two possible reasons:
>>   1) Kryo, or our Kryo setup has an issue here
>>   2) Kryo is inconsistently configured. There are multiple Kryo instances
>> used across the serializers in the sorter. There may be a bug that they are
>> not initialized in sync.
>>
>>
>> To check this, can you build Flink with this pull request (
>> https://github.com/apache/flink/pull/1528) or from this branch (
>> https://github.com/StephanEwen/incubator-flink kryo) and see if that
>> fixes it?
>>
>>
>> Thanks,
>> Stephan
>>
>>
>>
>>
>>
>> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> I haven't been able to reproduce this with other datasets. Taking a
>>> smaller sample from the large dataset I'm using (link to data
>>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>>> causes the same problem however.
>>>
>>> I'm wondering if the implementation of readLibSVM is what's wrong here.
>>> I've tried the new version commited recently by Chiwan, but I still get the
>>> same error.
>>>
>>> I'll see if I can spot a bug in readLibSVM.
>>>
>>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>>> theodoros.vasiloudis@gmail.com> wrote:
>>>
>>>> It's on 0.10.
>>>>
>>>> I've tried explicitly registering SparseVector (which is done anyway by
>>>> registerFlinkMLTypes
>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>>> which is called when the SVM predict or evaluate functions are called
>>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>>> in my job but I still get the same. I will try a couple different datasets
>>>> and try to see if it's the number of features that is causing this or
>>>> something else.
>>>>
>>>> So far it works fine for a dataset with 8 features, but the large one
>>>> has 2000 and I get the above error there. I will try large datasets with a
>>>> few features and small datasets with many features as well.
>>>>
>>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>>
>>>>> It is probably an incorrectly configured Kryo instance (not a problem
>>>>> of the sorter).
>>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>>> there should be no reference resolution during serialization /
>>>>> deserialization.
>>>>>
>>>>> Can you try what happens when you explicitly register the type
>>>>> SparseVector at the ExecutionEnvironment?
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>>
>>>>>> Hello all,
>>>>>>
>>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>>> source of an error.
>>>>>>
>>>>>> The job reads a libSVM formatted file and trains an SVM classifier on
>>>>>> it.
>>>>>>
>>>>>> I've tried this with small datasets and everything works out fine.
>>>>>>
>>>>>> When trying to run the same job on a large dataset (~11GB
>>>>>> uncompressed) however, I get the following error:
>>>>>>
>>>>>>
>>>>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>>>> 'SortMerger spilling thread' terminated due to an exception:
>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>> Serialization trace:
>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>>>>>> 14, Size: 2
>>>>>>> Serialization trace:
>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>> Serialization trace:
>>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>>         at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>>         at
>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>>         ... 10 more
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any idea what might be causing this? I'm running the job in local
>>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>>
>>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Theodore Vasiloudis <th...@gmail.com>.
OK here's what I tried:

* Build Flink (mvn clean install) from the branch you linked (kryo)
* Build my uber-jar, I use SBT with 1.0-SNAPSHOT as the Flink version,
added local maven repo to resolvers so that it picks up the previously
installed version (I hope)
* Launch local cluster from newly built Flink, try to run job

Still getting the same error.

Is there a way to ensure that SBT is picking up the local version of Flink
to build the uber-jar?
Does it matter in this case, or is it enough that I'm sure the launched
Flink instance comes from the branch you linked?

On Wed, Jan 20, 2016 at 4:30 PM, Stephan Ewen <se...@apache.org> wrote:

> The bug looks to be in the serialization via Kryo while spilling windows.
> Note that Kryo is here used as a fallback serializer, since the
> SparseVector is not transparent type to Flink.
>
> I think there are two possible reasons:
>   1) Kryo, or our Kryo setup has an issue here
>   2) Kryo is inconsistently configured. There are multiple Kryo instances
> used across the serializers in the sorter. There may be a bug that they are
> not initialized in sync.
>
>
> To check this, can you build Flink with this pull request (
> https://github.com/apache/flink/pull/1528) or from this branch (
> https://github.com/StephanEwen/incubator-flink kryo) and see if that
> fixes it?
>
>
> Thanks,
> Stephan
>
>
>
>
>
> On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> I haven't been able to reproduce this with other datasets. Taking a
>> smaller sample from the large dataset I'm using (link to data
>> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
>> causes the same problem however.
>>
>> I'm wondering if the implementation of readLibSVM is what's wrong here.
>> I've tried the new version commited recently by Chiwan, but I still get the
>> same error.
>>
>> I'll see if I can spot a bug in readLibSVM.
>>
>> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> It's on 0.10.
>>>
>>> I've tried explicitly registering SparseVector (which is done anyway by
>>> registerFlinkMLTypes
>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>>> which is called when the SVM predict or evaluate functions are called
>>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>>> in my job but I still get the same. I will try a couple different datasets
>>> and try to see if it's the number of features that is causing this or
>>> something else.
>>>
>>> So far it works fine for a dataset with 8 features, but the large one
>>> has 2000 and I get the above error there. I will try large datasets with a
>>> few features and small datasets with many features as well.
>>>
>>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>>
>>>> It is probably an incorrectly configured Kryo instance (not a problem
>>>> of the sorter).
>>>> What is strange is that it occurs in the "MapReferenceResolver" -
>>>> there should be no reference resolution during serialization /
>>>> deserialization.
>>>>
>>>> Can you try what happens when you explicitly register the type
>>>> SparseVector at the ExecutionEnvironment?
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>>> theodoros.vasiloudis@gmail.com> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I'm trying to run a job using FlinkML and I'm confused about the
>>>>> source of an error.
>>>>>
>>>>> The job reads a libSVM formatted file and trains an SVM classifier on
>>>>> it.
>>>>>
>>>>> I've tried this with small datasets and everything works out fine.
>>>>>
>>>>> When trying to run the same job on a large dataset (~11GB
>>>>> uncompressed) however, I get the following error:
>>>>>
>>>>>
>>>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>>> 'SortMerger spilling thread' terminated due to an exception:
>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>> Serialization trace:
>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>>         at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>>>>> 14, Size: 2
>>>>>> Serialization trace:
>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>> Serialization trace:
>>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>         at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>>         at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>>         at
>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>>         ... 10 more
>>>>>
>>>>>
>>>>>
>>>>> Any idea what might be causing this? I'm running the job in local
>>>>> mode, 1 TM with 8 slots and ~32GB heap size.
>>>>>
>>>>> All the vectors created by the libSVM loader have the correct size.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Stephan Ewen <se...@apache.org>.
The bug looks to be in the serialization via Kryo while spilling windows.
Note that Kryo is here used as a fallback serializer, since the
SparseVector is not transparent type to Flink.

I think there are two possible reasons:
  1) Kryo, or our Kryo setup has an issue here
  2) Kryo is inconsistently configured. There are multiple Kryo instances
used across the serializers in the sorter. There may be a bug that they are
not initialized in sync.


To check this, can you build Flink with this pull request (
https://github.com/apache/flink/pull/1528) or from this branch (
https://github.com/StephanEwen/incubator-flink kryo) and see if that fixes
it?


Thanks,
Stephan





On Wed, Jan 20, 2016 at 3:33 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> I haven't been able to reproduce this with other datasets. Taking a
> smaller sample from the large dataset I'm using (link to data
> <http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
> causes the same problem however.
>
> I'm wondering if the implementation of readLibSVM is what's wrong here.
> I've tried the new version commited recently by Chiwan, but I still get the
> same error.
>
> I'll see if I can spot a bug in readLibSVM.
>
> On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> It's on 0.10.
>>
>> I've tried explicitly registering SparseVector (which is done anyway by
>> registerFlinkMLTypes
>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
>> which is called when the SVM predict or evaluate functions are called
>> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
>> in my job but I still get the same. I will try a couple different datasets
>> and try to see if it's the number of features that is causing this or
>> something else.
>>
>> So far it works fine for a dataset with 8 features, but the large one has
>> 2000 and I get the above error there. I will try large datasets with a few
>> features and small datasets with many features as well.
>>
>> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>>
>>> It is probably an incorrectly configured Kryo instance (not a problem of
>>> the sorter).
>>> What is strange is that it occurs in the "MapReferenceResolver" - there
>>> should be no reference resolution during serialization / deserialization.
>>>
>>> Can you try what happens when you explicitly register the type
>>> SparseVector at the ExecutionEnvironment?
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>>> theodoros.vasiloudis@gmail.com> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm trying to run a job using FlinkML and I'm confused about the source
>>>> of an error.
>>>>
>>>> The job reads a libSVM formatted file and trains an SVM classifier on
>>>> it.
>>>>
>>>> I've tried this with small datasets and everything works out fine.
>>>>
>>>> When trying to run the same job on a large dataset (~11GB uncompressed)
>>>> however, I get the following error:
>>>>
>>>>
>>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>>> 'SortMerger spilling thread' terminated due to an exception:
>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>> Serialization trace:
>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>>>> 14, Size: 2
>>>>> Serialization trace:
>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>> Serialization trace:
>>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>         at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>>         at
>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>>         at
>>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>>         at
>>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>>         at
>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>>         ... 10 more
>>>>
>>>>
>>>>
>>>> Any idea what might be causing this? I'm running the job in local mode,
>>>> 1 TM with 8 slots and ~32GB heap size.
>>>>
>>>> All the vectors created by the libSVM loader have the correct size.
>>>>
>>>
>>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Theodore Vasiloudis <th...@gmail.com>.
I haven't been able to reproduce this with other datasets. Taking a smaller
sample from the large dataset I'm using (link to data
<http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#epsilon>)
causes the same problem however.

I'm wondering if the implementation of readLibSVM is what's wrong here.
I've tried the new version commited recently by Chiwan, but I still get the
same error.

I'll see if I can spot a bug in readLibSVM.

On Wed, Jan 20, 2016 at 1:43 PM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> It's on 0.10.
>
> I've tried explicitly registering SparseVector (which is done anyway by
> registerFlinkMLTypes
> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
> which is called when the SVM predict or evaluate functions are called
> <https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
> in my job but I still get the same. I will try a couple different datasets
> and try to see if it's the number of features that is causing this or
> something else.
>
> So far it works fine for a dataset with 8 features, but the large one has
> 2000 and I get the above error there. I will try large datasets with a few
> features and small datasets with many features as well.
>
> On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>>
>> It is probably an incorrectly configured Kryo instance (not a problem of
>> the sorter).
>> What is strange is that it occurs in the "MapReferenceResolver" - there
>> should be no reference resolution during serialization / deserialization.
>>
>> Can you try what happens when you explicitly register the type
>> SparseVector at the ExecutionEnvironment?
>>
>> Stephan
>>
>>
>> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
>> theodoros.vasiloudis@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to run a job using FlinkML and I'm confused about the source
>>> of an error.
>>>
>>> The job reads a libSVM formatted file and trains an SVM classifier on it.
>>>
>>> I've tried this with small datasets and everything works out fine.
>>>
>>> When trying to run the same job on a large dataset (~11GB uncompressed)
>>> however, I get the following error:
>>>
>>>
>>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>>> 'SortMerger spilling thread' terminated due to an exception:
>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>> Serialization trace:
>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>         at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>         at
>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>>         at
>>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>>         at
>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>>         at
>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>>> 14, Size: 2
>>>> Serialization trace:
>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>         at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>> Serialization trace:
>>>> indices (org.apache.flink.ml.math.SparseVector)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>         at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>>         at
>>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>>         at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>>         at
>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>>         at
>>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>>         ... 10 more
>>>
>>>
>>>
>>> Any idea what might be causing this? I'm running the job in local mode,
>>> 1 TM with 8 slots and ~32GB heap size.
>>>
>>> All the vectors created by the libSVM loader have the correct size.
>>>
>>
>>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Theodore Vasiloudis <th...@gmail.com>.
It's on 0.10.

I've tried explicitly registering SparseVector (which is done anyway by
registerFlinkMLTypes
<https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala#L49>
which is called when the SVM predict or evaluate functions are called
<https://github.com/apache/flink/blob/e9bf13d8626099a1d6ddb6ebe98c50be848fe79e/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala#L58>)
in my job but I still get the same. I will try a couple different datasets
and try to see if it's the number of features that is causing this or
something else.

So far it works fine for a dataset with 8 features, but the large one has
2000 and I get the above error there. I will try large datasets with a few
features and small datasets with many features as well.

On Wed, Jan 20, 2016 at 11:39 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Does this error occur in 0.10 or im 1.0-SNAPSHOT?
>
> It is probably an incorrectly configured Kryo instance (not a problem of
> the sorter).
> What is strange is that it occurs in the "MapReferenceResolver" - there
> should be no reference resolution during serialization / deserialization.
>
> Can you try what happens when you explicitly register the type
> SparseVector at the ExecutionEnvironment?
>
> Stephan
>
>
> On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
> theodoros.vasiloudis@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm trying to run a job using FlinkML and I'm confused about the source
>> of an error.
>>
>> The job reads a libSVM formatted file and trains an SVM classifier on it.
>>
>> I've tried this with small datasets and everything works out fine.
>>
>> When trying to run the same job on a large dataset (~11GB uncompressed)
>> however, I get the following error:
>>
>>
>>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>>> 'SortMerger spilling thread' terminated due to an exception:
>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>>         at
>>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>>         at
>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>>> 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>> Caused by: com.esotericsoftware.kryo.KryoException:
>>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>> Serialization trace:
>>> indices (org.apache.flink.ml.math.SparseVector)
>>>         at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>>         at
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>>         at
>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>         at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>>         at
>>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>>         at
>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>>         at java.util.ArrayList.set(ArrayList.java:444)
>>>         at
>>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>>         at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>>         at
>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>>         ... 10 more
>>
>>
>>
>> Any idea what might be causing this? I'm running the job in local mode, 1
>> TM with 8 slots and ~32GB heap size.
>>
>> All the vectors created by the libSVM loader have the correct size.
>>
>
>

Re: Unexpected out of bounds error in UnilateralSortMerger

Posted by Stephan Ewen <se...@apache.org>.
Hi!

Does this error occur in 0.10 or im 1.0-SNAPSHOT?

It is probably an incorrectly configured Kryo instance (not a problem of
the sorter).
What is strange is that it occurs in the "MapReferenceResolver" - there
should be no reference resolution during serialization / deserialization.

Can you try what happens when you explicitly register the type SparseVector
at the ExecutionEnvironment?

Stephan


On Wed, Jan 20, 2016 at 11:24 AM, Theodore Vasiloudis <
theodoros.vasiloudis@gmail.com> wrote:

> Hello all,
>
> I'm trying to run a job using FlinkML and I'm confused about the source of
> an error.
>
> The job reads a libSVM formatted file and trains an SVM classifier on it.
>
> I've tried this with small datasets and everything works out fine.
>
> When trying to run the same job on a large dataset (~11GB uncompressed)
> however, I get the following error:
>
>
>> java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger spilling thread' terminated due to an exception:
>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>         at
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:78)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.lang.IndexOutOfBoundsException: Index:
>> 14, Size: 2
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>> Serialization trace:
>> indices (org.apache.flink.ml.math.SparseVector)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:246)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.IndexOutOfBoundsException: Index: 14, Size: 2
>>         at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>         at java.util.ArrayList.set(ArrayList.java:444)
>>         at
>> com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38)
>>         at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823)
>>         at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:731)
>>         at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>>         ... 10 more
>
>
>
> Any idea what might be causing this? I'm running the job in local mode, 1
> TM with 8 slots and ~32GB heap size.
>
> All the vectors created by the libSVM loader have the correct size.
>