You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2017/04/19 18:22:02 UTC

UnilateralSortMerger error (again)

Hi to all,
I think I'm again on the weird Exception with the
SpillingAdaptiveSpanningRecordDeserializer...
I'm using Flink 1.2.0 and the error seems to rise when Flink spills to disk
but the Exception thrown is not very helpful. Any idea?

Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: null
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: null
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at
org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
at org.apache.flink.types.StringValue.readString(StringValue.java:747)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


Best,
Flavio

Re: UnilateralSortMerger error (again)

Posted by Fabian Hueske <fh...@gmail.com>.
Thank you Kurt!

2017-04-27 17:40 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> Great!! Thanks a lot Kurt
>
> On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <yk...@gmail.com> wrote:
>
>> Hi, i have found the bug: https://issues.apache.org
>> /jira/browse/FLINK-6398, will open a PR soon.
>>
>> Best,
>> Kurt
>>
>> On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> Thanks a lot Kurt!
>>>
>>> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <yk...@gmail.com> wrote:
>>>
>>>> Thanks for the test case, i will take a look at it.
>>>>
>>>> Flavio Pompermaier <po...@okkam.it>于2017年4月27日 周四03:55写道:
>>>>
>>>>> I've created a repository with a unit test to reproduce the error at
>>>>> https://github.com/fpompermaier/flink-batch-bug/blob/mast
>>>>> er/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably
>>>>> this error is related also to FLINK-4719).
>>>>>
>>>>> The exception is  thrown only when there are null strings and multiple
>>>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>>>> not (but I think so..).
>>>>> A quick fix for this problem would be very appreciated because it's
>>>>> bloking a production deployment..
>>>>>
>>>>> Thanks in advance to all,
>>>>> Flavio
>>>>>
>>>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> After digging into the code and test I think that the problem is
>>>>>> almost certainly in the UnilateralSortMerger, there should be a missing
>>>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>>>> understand if this section of code creates some shared object (like queues)
>>>>>> that are accessed in a bad way when there's spilling to disk:
>>>>>>
>>>>>>                // start the thread that reads the input channels
>>>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>>>> circularQueues, largeRecordHandler,
>>>>>> parentTask, serializer, ((long) (startSpillingFraction *
>>>>>> sortMemory)));
>>>>>>
>>>>>> // start the thread that sorts the buffers
>>>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>>>> parentTask);
>>>>>>
>>>>>> // start the thread that handles spilling to secondary storage
>>>>>> this.spillThread = getSpillingThread(exceptionHandler,
>>>>>> circularQueues, parentTask,
>>>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>>>> this.sortReadMemory, this.writeMemory,
>>>>>> maxNumFileHandles);
>>>>>> ....
>>>>>> startThreads();
>>>>>>
>>>>>>
>>>>>> The problem is that the unit tests of GroupReduceDriver use Record
>>>>>> and testing Rows in not very straightforward and I'm still trying to
>>>>>> reproduce the problem in a local env..
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> Thanks for the explanation . Is there a way to force this behaviour
>>>>>>> in a local environment (to try to debug the problem)?
>>>>>>>
>>>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flavio,
>>>>>>>>
>>>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>>>> runs of records.
>>>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>>>> merged to get a completely sorted record stream.
>>>>>>>>
>>>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it
>>>>>>>> >:
>>>>>>>>
>>>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>>>> inputchannel file.
>>>>>>>>> What are those files used for?
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>>>> helpful?
>>>>>>>>>>
>>>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more bytes than
>>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>>>> Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>> ava:465)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>>>> k.java:355)
>>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>> ask.java:1094)
>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>> ava:460)
>>>>>>>>>> ... 3 more
>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more bytes than
>>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>>>> Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes
>>>>>>>>>> than the record had. This indicates broken serialization. If you are using
>>>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>>>> Kryo serializer.
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>>>> ySegment.java:104)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>> va:770)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>>>> ... 5 more
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Stefano
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>>>> the entire data (using the id value to filter data with parquet push down
>>>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>>>
>>>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>>>>>>
>>>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>>>> the error showed up again.
>>>>>>>>>>>
>>>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>>>> understand what's going on :(
>>>>>>>>>>>
>>>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows
>>>>>>>>>>> => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>>>>> rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>>>  rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>>
>>>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Any help is appreciated..
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has
>>>>>>>>>>> been some recent change.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error
>>>>>>>>>>> is reproducible ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Cheers
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi to all,
>>>>>>>>>>>
>>>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>>>
>>>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the
>>>>>>>>>>> sorted input: Thread 'SortMerger Reading Thread' terminated due to an
>>>>>>>>>>> exception: null
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>>> ask.java:1094)
>>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>>> ava:460)
>>>>>>>>>>> ... 3 more
>>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>>>>>> Thread' terminated due to an exception: null
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>>> va:747)
>>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>> --
>>>> Best,
>>>> Kurt
>>>>
>>>
>>>
>>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
Great!! Thanks a lot Kurt

On Thu, Apr 27, 2017 at 5:31 PM, Kurt Young <yk...@gmail.com> wrote:

> Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398,
> will open a PR soon.
>
> Best,
> Kurt
>
> On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Thanks a lot Kurt!
>>
>> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <yk...@gmail.com> wrote:
>>
>>> Thanks for the test case, i will take a look at it.
>>>
>>> Flavio Pompermaier <po...@okkam.it>于2017年4月27日 周四03:55写道:
>>>
>>>> I've created a repository with a unit test to reproduce the error at
>>>> https://github.com/fpompermaier/flink-batch-bug/blob/mast
>>>> er/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably
>>>> this error is related also to FLINK-4719).
>>>>
>>>> The exception is  thrown only when there are null strings and multiple
>>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>>> not (but I think so..).
>>>> A quick fix for this problem would be very appreciated because it's
>>>> bloking a production deployment..
>>>>
>>>> Thanks in advance to all,
>>>> Flavio
>>>>
>>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> After digging into the code and test I think that the problem is
>>>>> almost certainly in the UnilateralSortMerger, there should be a missing
>>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>>> understand if this section of code creates some shared object (like queues)
>>>>> that are accessed in a bad way when there's spilling to disk:
>>>>>
>>>>>                // start the thread that reads the input channels
>>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>>> circularQueues, largeRecordHandler,
>>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>>>
>>>>> // start the thread that sorts the buffers
>>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>>> parentTask);
>>>>>
>>>>> // start the thread that handles spilling to secondary storage
>>>>> this.spillThread = getSpillingThread(exceptionHandler,
>>>>> circularQueues, parentTask,
>>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>>> this.sortReadMemory, this.writeMemory,
>>>>> maxNumFileHandles);
>>>>> ....
>>>>> startThreads();
>>>>>
>>>>>
>>>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>>>> the problem in a local env..
>>>>>
>>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Thanks for the explanation . Is there a way to force this behaviour
>>>>>> in a local environment (to try to debug the problem)?
>>>>>>
>>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Flavio,
>>>>>>>
>>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>>> runs of records.
>>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>>> merged to get a completely sorted record stream.
>>>>>>>
>>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>
>>>>>>> :
>>>>>>>
>>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>>> inputchannel file.
>>>>>>>> What are those files used for?
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>>> helpful?
>>>>>>>>>
>>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>>> Thread' terminated due to an exception: Serializer consumed more bytes than
>>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>>> Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>> ava:465)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>>> k.java:355)
>>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>> ask.java:1094)
>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>> ava:460)
>>>>>>>>> ... 3 more
>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>>>> serializer.
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes
>>>>>>>>> than the record had. This indicates broken serialization. If you are using
>>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>>> Kryo serializer.
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>>> ySegment.java:104)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>> va:770)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>>> ... 5 more
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>>>>
>>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Stefano
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>>> the entire data (using the id value to filter data with parquet push down
>>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>>
>>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>>>>>
>>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>>> the error showed up again.
>>>>>>>>>>
>>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>>> understand what's going on :(
>>>>>>>>>>
>>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows
>>>>>>>>>> => OK
>>>>>>>>>>
>>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>>>> rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>>  rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>>
>>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Any help is appreciated..
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>>>>> some recent change.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>>>>> reproducible ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Cheers
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi to all,
>>>>>>>>>>
>>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>>
>>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>>>>> null
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>>> ask.java:1094)
>>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>>>>>> ava:460)
>>>>>>>>>> ... 3 more
>>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading
>>>>>>>>>> Thread' terminated due to an exception: null
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>>>>>> va:747)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Flavio
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>> --
>>> Best,
>>> Kurt
>>>
>>
>>
>

Re: UnilateralSortMerger error (again)

Posted by Kurt Young <yk...@gmail.com>.
Hi, i have found the bug: https://issues.apache.org/jira/browse/FLINK-6398,
will open a PR soon.

Best,
Kurt

On Thu, Apr 27, 2017 at 8:23 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Thanks a lot Kurt!
>
> On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <yk...@gmail.com> wrote:
>
>> Thanks for the test case, i will take a look at it.
>>
>> Flavio Pompermaier <po...@okkam.it>于2017年4月27日 周四03:55写道:
>>
>>> I've created a repository with a unit test to reproduce the error at
>>> https://github.com/fpompermaier/flink-batch-bug/blob/
>>> master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably
>>> this error is related also to FLINK-4719).
>>>
>>> The exception is  thrown only when there are null strings and multiple
>>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>>> not (but I think so..).
>>> A quick fix for this problem would be very appreciated because it's
>>> bloking a production deployment..
>>>
>>> Thanks in advance to all,
>>> Flavio
>>>
>>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> After digging into the code and test I think that the problem is almost
>>>> certainly in the UnilateralSortMerger, there should be a missing
>>>> synchronization on some shared object somewhere...Right now I'm trying to
>>>> understand if this section of code creates some shared object (like queues)
>>>> that are accessed in a bad way when there's spilling to disk:
>>>>
>>>>                // start the thread that reads the input channels
>>>> this.readThread = getReadingThread(exceptionHandler, input,
>>>> circularQueues, largeRecordHandler,
>>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>>
>>>> // start the thread that sorts the buffers
>>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>>> parentTask);
>>>>
>>>> // start the thread that handles spilling to secondary storage
>>>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>>>> parentTask,
>>>> memoryManager, ioManager, serializerFactory, comparator,
>>>> this.sortReadMemory, this.writeMemory,
>>>> maxNumFileHandles);
>>>> ....
>>>> startThreads();
>>>>
>>>>
>>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>>> the problem in a local env..
>>>>
>>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Thanks for the explanation . Is there a way to force this behaviour in
>>>>> a local environment (to try to debug the problem)?
>>>>>
>>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>>
>>>>>> Hi Flavio,
>>>>>>
>>>>>> these files are used for spilling data to disk. In your case sorted
>>>>>> runs of records.
>>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>>> merged to get a completely sorted record stream.
>>>>>>
>>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>>
>>>>>>> The error appears as soon as some taskmanager generates some
>>>>>>> inputchannel file.
>>>>>>> What are those files used for?
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>>> helpful?
>>>>>>>>
>>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading
>>>>>>>> Thread' terminated due to an exception: Serializer consumed more bytes than
>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>> Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>> java:465)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>>>>> k.java:355)
>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>> ask.java:1094)
>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>> java:460)
>>>>>>>> ... 3 more
>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>>> serializer.
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>>> Kryo serializer.
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>> ReaderIterator.java:59)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>>>>> ySegment.java:104)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>>>>>>>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.
>>>>>>>> java:770)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>>>>> ... 5 more
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>>>
>>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Stefano
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>>> serializers, not in the sorter.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What types are you using at that point? The Stack Trace reveals
>>>>>>>>> ROW and StringValue, any other involved types?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>>> the entire data (using the id value to filter data with parquet push down
>>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>>
>>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4)
>>>>>>>>> to see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>>>>
>>>>>>>>> Then I almost doubled the number of rows to process and finally
>>>>>>>>> the error showed up again.
>>>>>>>>>
>>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>>> understand what's going on :(
>>>>>>>>>
>>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>>
>>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows =>
>>>>>>>>> OK
>>>>>>>>>
>>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>>> rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>>  rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>>
>>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Any help is appreciated..
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>>> problem...how confident are you about it?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>>>> some recent change.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>>>> reproducible ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Cheers
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>> Hi to all,
>>>>>>>>>
>>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>>
>>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink
>>>>>>>>> spills to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>>>> null
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>>>>> ask.java:1094)
>>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>>>>> (GroupReduceDriver.java:99)
>>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.
>>>>>>>>> java:460)
>>>>>>>>> ... 3 more
>>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>>> terminated due to an exception: null
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>>> Caused by: java.io.EOFException
>>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.
>>>>>>>>> java:747)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:69)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:74)
>>>>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>>>>> deserialize(StringSerializer.java:28)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>>>>> ReaderIterator.java:59)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Flavio
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>>
>>> --
>> Best,
>> Kurt
>>
>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks a lot Kurt!

On Thu, Apr 27, 2017 at 2:12 PM, Kurt Young <yk...@gmail.com> wrote:

> Thanks for the test case, i will take a look at it.
>
> Flavio Pompermaier <po...@okkam.it>于2017年4月27日 周四03:55写道:
>
>> I've created a repository with a unit test to reproduce the error at
>> https://github.com/fpompermaier/flink-batch-bug/
>> blob/master/src/test/java/it/okkam/flink/aci/
>> TestDataInputDeserializer.java (probably this error is related also to
>> FLINK-4719).
>>
>> The exception is  thrown only when there are null strings and multiple
>> slots per TM, I don't know whether UnilateralSorterMerger is involved or
>> not (but I think so..).
>> A quick fix for this problem would be very appreciated because it's
>> bloking a production deployment..
>>
>> Thanks in advance to all,
>> Flavio
>>
>> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> After digging into the code and test I think that the problem is almost
>>> certainly in the UnilateralSortMerger, there should be a missing
>>> synchronization on some shared object somewhere...Right now I'm trying to
>>> understand if this section of code creates some shared object (like queues)
>>> that are accessed in a bad way when there's spilling to disk:
>>>
>>>                // start the thread that reads the input channels
>>> this.readThread = getReadingThread(exceptionHandler, input,
>>> circularQueues, largeRecordHandler,
>>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>>
>>> // start the thread that sorts the buffers
>>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>>> parentTask);
>>>
>>> // start the thread that handles spilling to secondary storage
>>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>>> parentTask,
>>> memoryManager, ioManager, serializerFactory, comparator,
>>> this.sortReadMemory, this.writeMemory,
>>> maxNumFileHandles);
>>> ....
>>> startThreads();
>>>
>>>
>>> The problem is that the unit tests of GroupReduceDriver use Record and
>>> testing Rows in not very straightforward and I'm still trying to reproduce
>>> the problem in a local env..
>>>
>>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Thanks for the explanation . Is there a way to force this behaviour in
>>>> a local environment (to try to debug the problem)?
>>>>
>>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>>
>>>>> Hi Flavio,
>>>>>
>>>>> these files are used for spilling data to disk. In your case sorted
>>>>> runs of records.
>>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>>> merged to get a completely sorted record stream.
>>>>>
>>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>>
>>>>>> The error appears as soon as some taskmanager generates some
>>>>>> inputchannel file.
>>>>>> What are those files used for?
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>>> In another run of the job I had another Exception. Could it be
>>>>>>> helpful?
>>>>>>>
>>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>> serializer.
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>>> BatchTask.java:465)
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(
>>>>>>> BatchTask.java:355)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>>> Writable), check their serialization methods. If you are using a
>>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
>>>>>>> getIterator(UnilateralSortMerger.java:619)
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(
>>>>>>> BatchTask.java:1094)
>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>>>>> prepare(GroupReduceDriver.java:99)
>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>>> BatchTask.java:460)
>>>>>>> ... 3 more
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>>> serializer.
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>> ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>>> Kryo serializer.
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:123)
>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>> MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.
>>>>>>> next(ReaderIterator.java:59)
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>> ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>> ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(
>>>>>>> HeapMemorySegment.java:104)
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer$
>>>>>>> NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecord
>>>>>>> Deserializer.java:226)
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>>>>>>> readUnsignedByte(SpillingAdaptiveSpanningRecord
>>>>>>> Deserializer.java:231)
>>>>>>> at org.apache.flink.types.StringValue.readString(
>>>>>>> StringValue.java:770)
>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>> StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>> StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>> StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:193)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:36)
>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate
>>>>>>> .read(ReusingDeserializationDelegate.java:57)
>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>>>>>> ... 5 more
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>>
>>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Stefano
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>>> serializers, not in the sorter.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>>>>> and StringValue, any other involved types?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> I debugged a bit the process repeating the job on a sub-slice of
>>>>>>>> the entire data (using the id value to filter data with parquet push down
>>>>>>>> filters) and all slices completed successfully :(
>>>>>>>>
>>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>>>>> see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>>>
>>>>>>>> Then I almost doubled the number of rows to process and finally the
>>>>>>>> error showed up again.
>>>>>>>>
>>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>>> understand what's going on :(
>>>>>>>>
>>>>>>>> This is a summary of my debug attempts:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>>
>>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows =>
>>>>>>>> OK
>>>>>>>>
>>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903
>>>>>>>> rows => OK
>>>>>>>>
>>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>>  rows => OK
>>>>>>>>
>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>>
>>>>>>>>
>>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>>
>>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Any help is appreciated..
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>>> problem...how confident are you about it?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>>> some recent change.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>>> reproducible ?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>>
>>>>>>>> Hi to all,
>>>>>>>>
>>>>>>>> I think I'm again on the weird Exception with the
>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>>
>>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>>> null
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
>>>>>>>> getIterator(UnilateralSortMerger.java:619)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(
>>>>>>>> BatchTask.java:1094)
>>>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>>>>>> prepare(GroupReduceDriver.java:99)
>>>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(
>>>>>>>> BatchTask.java:460)
>>>>>>>> ... 3 more
>>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>>> terminated due to an exception: null
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>>> ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>>> Caused by: java.io.EOFException
>>>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.
>>>>>>>> readUnsignedByte(DataInputDeserializer.java:306)
>>>>>>>> at org.apache.flink.types.StringValue.readString(
>>>>>>>> StringValue.java:747)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>>> StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>>> StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>>> at org.apache.flink.api.common.typeutils.base.
>>>>>>>> StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:193)
>>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>>> serialize(RowSerializer.java:36)
>>>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate
>>>>>>>> .read(ReusingDeserializationDelegate.java:57)
>>>>>>>> at org.apache.flink.runtime.io.network.api.serialization.
>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>>>>>>> SpillingAdaptiveSpanningRecordDeserializer.java:144)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>>> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>>> at org.apache.flink.runtime.io.network.api.reader.
>>>>>>>> MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.
>>>>>>>> next(ReaderIterator.java:59)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>>> ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
>>>>>>>> ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Flavio
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>>
>> --
> Best,
> Kurt
>

Re: UnilateralSortMerger error (again)

Posted by Kurt Young <yk...@gmail.com>.
Thanks for the test case, i will take a look at it.

Flavio Pompermaier <po...@okkam.it>于2017年4月27日 周四03:55写道:

> I've created a repository with a unit test to reproduce the error at
> https://github.com/fpompermaier/flink-batch-bug/blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java (probably
> this error is related also to FLINK-4719).
>
> The exception is  thrown only when there are null strings and multiple
> slots per TM, I don't know whether UnilateralSorterMerger is involved or
> not (but I think so..).
> A quick fix for this problem would be very appreciated because it's
> bloking a production deployment..
>
> Thanks in advance to all,
> Flavio
>
> On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> After digging into the code and test I think that the problem is almost
>> certainly in the UnilateralSortMerger, there should be a missing
>> synchronization on some shared object somewhere...Right now I'm trying to
>> understand if this section of code creates some shared object (like queues)
>> that are accessed in a bad way when there's spilling to disk:
>>
>>                // start the thread that reads the input channels
>> this.readThread = getReadingThread(exceptionHandler, input,
>> circularQueues, largeRecordHandler,
>> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>>
>> // start the thread that sorts the buffers
>> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
>> parentTask);
>>
>> // start the thread that handles spilling to secondary storage
>> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
>> parentTask,
>> memoryManager, ioManager, serializerFactory, comparator,
>> this.sortReadMemory, this.writeMemory,
>> maxNumFileHandles);
>> ....
>> startThreads();
>>
>>
>> The problem is that the unit tests of GroupReduceDriver use Record and
>> testing Rows in not very straightforward and I'm still trying to reproduce
>> the problem in a local env..
>>
>> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> Thanks for the explanation . Is there a way to force this behaviour in a
>>> local environment (to try to debug the problem)?
>>>
>>> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> these files are used for spilling data to disk. In your case sorted
>>>> runs of records.
>>>> Later all (up to a fanout threshold) these sorted runs are read and
>>>> merged to get a completely sorted record stream.
>>>>
>>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>>
>>>>> The error appears as soon as some taskmanager generates some
>>>>> inputchannel file.
>>>>> What are those files used for?
>>>>>
>>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> In another run of the job I had another Exception. Could it be
>>>>>> helpful?
>>>>>>
>>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>> serializer.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>>> serialization. If you are using custom serialization types (Value or
>>>>>> Writable), check their serialization methods. If you are using a
>>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>>>> ... 3 more
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>>> record had. This indicates broken serialization. If you are using custom
>>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>>> serializer.
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>>> the record had. This indicates broken serialization. If you are using
>>>>>> custom serialization types (Value or Writable), check their serialization
>>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>>> Kryo serializer.
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>> at
>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>>> at
>>>>>> org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>>>>>> at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:193)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:36)
>>>>>> at
>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>>> at org.apache.flink.runtime.io
>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>>>>> ... 5 more
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>>
>>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Stefano
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>>> *To:* user <us...@flink.apache.org>
>>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>>> serializers, not in the sorter.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>>>> and StringValue, any other involved types?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to
>>>>>>> force spilling to disk) and the job failed almost immediately..
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>>>>> entire data (using the id value to filter data with parquet push down
>>>>>>> filters) and all slices completed successfully :(
>>>>>>>
>>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>>>> see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>>
>>>>>>> Then I almost doubled the number of rows to process and finally the
>>>>>>> error showed up again.
>>>>>>>
>>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>>> understand what's going on :(
>>>>>>>
>>>>>>> This is a summary of my debug attempts:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>>
>>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>>>>
>>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
>>>>>>> => OK
>>>>>>>
>>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750
>>>>>>>  rows => OK
>>>>>>>
>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>>
>>>>>>>
>>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>>
>>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Any help is appreciated..
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>> I could but only if there's a good probability that it fix the
>>>>>>> problem...how confident are you about it?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>>
>>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>>> some recent change.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>>> reproducible ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>>> pompermaier@okkam.it> wrote:
>>>>>>>
>>>>>>> Hi to all,
>>>>>>>
>>>>>>> I think I'm again on the weird Exception with the
>>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>>
>>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>>> null
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>>>>> ... 3 more
>>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>>> terminated due to an exception: null
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>>> Caused by: java.io.EOFException
>>>>>>> at
>>>>>>> org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
>>>>>>> at
>>>>>>> org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:193)
>>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>>> serialize(RowSerializer.java:36)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
>>>>>>> at org.apache.flink.runtime.io
>>>>>>> .network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
>>>>>>> at org.apache.flink.runtime.io
>>>>>>> .network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>>> at org.apache.flink.runtime.io
>>>>>>> .network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>
> --
Best,
Kurt

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
I've created a repository with a unit test to reproduce the error at
https://github.com/fpompermaier/flink-batch-bug/
blob/master/src/test/java/it/okkam/flink/aci/TestDataInputDeserializer.java
(probably
this error is related also to FLINK-4719).

The exception is  thrown only when there are null strings and multiple
slots per TM, I don't know whether UnilateralSorterMerger is involved or
not (but I think so..).
A quick fix for this problem would be very appreciated because it's bloking
a production deployment..

Thanks in advance to all,
Flavio

On Wed, Apr 26, 2017 at 4:42 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> After digging into the code and test I think that the problem is almost
> certainly in the UnilateralSortMerger, there should be a missing
> synchronization on some shared object somewhere...Right now I'm trying to
> understand if this section of code creates some shared object (like queues)
> that are accessed in a bad way when there's spilling to disk:
>
>                // start the thread that reads the input channels
> this.readThread = getReadingThread(exceptionHandler, input,
> circularQueues, largeRecordHandler,
> parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));
>
> // start the thread that sorts the buffers
> this.sortThread = getSortingThread(exceptionHandler, circularQueues,
> parentTask);
>
> // start the thread that handles spilling to secondary storage
> this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
> parentTask,
> memoryManager, ioManager, serializerFactory, comparator,
> this.sortReadMemory, this.writeMemory,
> maxNumFileHandles);
> ....
> startThreads();
>
>
> The problem is that the unit tests of GroupReduceDriver use Record and
> testing Rows in not very straightforward and I'm still trying to reproduce
> the problem in a local env..
>
> On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> Thanks for the explanation . Is there a way to force this behaviour in a
>> local environment (to try to debug the problem)?
>>
>> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>>
>>> Hi Flavio,
>>>
>>> these files are used for spilling data to disk. In your case sorted runs
>>> of records.
>>> Later all (up to a fanout threshold) these sorted runs are read and
>>> merged to get a completely sorted record stream.
>>>
>>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> The error appears as soon as some taskmanager generates some
>>>> inputchannel file.
>>>> What are those files used for?
>>>>
>>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> In another run of the job I had another Exception. Could it be helpful?
>>>>>
>>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>> record had. This indicates broken serialization. If you are using custom
>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>> serializer.
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:465)
>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>>> k.java:355)
>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>>> serialization. If you are using custom serialization types (Value or
>>>>> Writable), check their serialization methods. If you are using a
>>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1094)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>> (GroupReduceDriver.java:99)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:460)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>>> record had. This indicates broken serialization. If you are using custom
>>>>> serialization types (Value or Writable), check their serialization methods.
>>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>>> serializer.
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.IOException: Serializer consumed more bytes than
>>>>> the record had. This indicates broken serialization. If you are using
>>>>> custom serialization types (Value or Writable), check their serialization
>>>>> methods. If you are using a Kryo-serialized type, check the corresponding
>>>>> Kryo serializer.
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:123)
>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:42)
>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>>> ySegment.java:104)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:69)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:74)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:28)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:193)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:36)
>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:109)
>>>>> ... 5 more
>>>>>
>>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>>> stefano.bortoli@huawei.com> wrote:
>>>>>
>>>>>> In fact the old problem was with the KryoSerializer missed
>>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>>> the program. Till worked on it debugging the source code generating the
>>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>>> make the problem reproducible in a shareable program+data.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Stefano
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>>> *To:* user <us...@flink.apache.org>
>>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>>
>>>>>>
>>>>>>
>>>>>> In the past, these errors were most often caused by bugs in the
>>>>>> serializers, not in the sorter.
>>>>>>
>>>>>>
>>>>>>
>>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>>> and StringValue, any other involved types?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>>>>> spilling to disk) and the job failed almost immediately..
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>>>> entire data (using the id value to filter data with parquet push down
>>>>>> filters) and all slices completed successfully :(
>>>>>>
>>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>>> see if this was somehow a factor of stress but it didn't cause any error.
>>>>>>
>>>>>> Then I almost doubled the number of rows to process and finally the
>>>>>> error showed up again.
>>>>>>
>>>>>> It seems somehow related to spilling to disk but I can't really
>>>>>> understand what's going on :(
>>>>>>
>>>>>> This is a summary of my debug attempts:
>>>>>>
>>>>>>
>>>>>>
>>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>>
>>>>>>
>>>>>>
>>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>>
>>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>>>
>>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
>>>>>> => OK
>>>>>>
>>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows
>>>>>> => OK
>>>>>>
>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>
>>>>>>
>>>>>>
>>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>>
>>>>>>
>>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>>
>>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any help is appreciated..
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>> I could but only if there's a good probability that it fix the
>>>>>> problem...how confident are you about it?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>> Looking at git log of DataInputDeserializer.java , there has been
>>>>>> some recent change.
>>>>>>
>>>>>>
>>>>>>
>>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>>> reproducible ?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>>> pompermaier@okkam.it> wrote:
>>>>>>
>>>>>> Hi to all,
>>>>>>
>>>>>> I think I'm again on the weird Exception with the
>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>>
>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>> null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>> ask.java:1094)
>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>> (GroupReduceDriver.java:99)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:460)
>>>>>> ... 3 more
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>> Caused by: java.io.EOFException
>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>> va:747)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:69)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:74)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:28)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:193)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:36)
>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Flavio
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
After digging into the code and test I think that the problem is almost
certainly in the UnilateralSortMerger, there should be a missing
synchronization on some shared object somewhere...Right now I'm trying to
understand if this section of code creates some shared object (like queues)
that are accessed in a bad way when there's spilling to disk:

               // start the thread that reads the input channels
this.readThread = getReadingThread(exceptionHandler, input, circularQueues,
largeRecordHandler,
parentTask, serializer, ((long) (startSpillingFraction * sortMemory)));

// start the thread that sorts the buffers
this.sortThread = getSortingThread(exceptionHandler, circularQueues,
parentTask);

// start the thread that handles spilling to secondary storage
this.spillThread = getSpillingThread(exceptionHandler, circularQueues,
parentTask,
memoryManager, ioManager, serializerFactory, comparator,
this.sortReadMemory, this.writeMemory,
maxNumFileHandles);
....
startThreads();


The problem is that the unit tests of GroupReduceDriver use Record and
testing Rows in not very straightforward and I'm still trying to reproduce
the problem in a local env..

On Fri, Apr 21, 2017 at 9:53 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Thanks for the explanation . Is there a way to force this behaviour in a
> local environment (to try to debug the problem)?
>
> On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:
>
>> Hi Flavio,
>>
>> these files are used for spilling data to disk. In your case sorted runs
>> of records.
>> Later all (up to a fanout threshold) these sorted runs are read and
>> merged to get a completely sorted record stream.
>>
>> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> The error appears as soon as some taskmanager generates some
>>> inputchannel file.
>>> What are those files used for?
>>>
>>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> In another run of the job I had another Exception. Could it be helpful?
>>>>
>>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>> record had. This indicates broken serialization. If you are using custom
>>>> serialization types (Value or Writable), check their serialization methods.
>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>>> k.java:355)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> Serializer consumed more bytes than the record had. This indicates broken
>>>> serialization. If you are using custom serialization types (Value or
>>>> Writable), check their serialization methods. If you are using a
>>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1094)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:99)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: Serializer consumed more bytes than the
>>>> record had. This indicates broken serialization. If you are using custom
>>>> serialization types (Value or Writable), check their serialization methods.
>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>>>> record had. This indicates broken serialization. If you are using custom
>>>> serialization types (Value or Writable), check their serialization methods.
>>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>>> serializer.
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:123)
>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:42)
>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>>> ySegment.java:104)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:69)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:74)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:28)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:193)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:36)
>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:109)
>>>> ... 5 more
>>>>
>>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>>> stefano.bortoli@huawei.com> wrote:
>>>>
>>>>> In fact the old problem was with the KryoSerializer missed
>>>>> initialization on the exception that would trigger the spilling on disk.
>>>>> This would lead to dirty serialization buffer that would eventually break
>>>>> the program. Till worked on it debugging the source code generating the
>>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>>> make the problem reproducible in a shareable program+data.
>>>>>
>>>>>
>>>>>
>>>>> Stefano
>>>>>
>>>>>
>>>>>
>>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>>> *To:* user <us...@flink.apache.org>
>>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>>
>>>>>
>>>>>
>>>>> In the past, these errors were most often caused by bugs in the
>>>>> serializers, not in the sorter.
>>>>>
>>>>>
>>>>>
>>>>> What types are you using at that point? The Stack Trace reveals ROW
>>>>> and StringValue, any other involved types?
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>>>> spilling to disk) and the job failed almost immediately..
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>>> entire data (using the id value to filter data with parquet push down
>>>>> filters) and all slices completed successfully :(
>>>>>
>>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to
>>>>> see if this was somehow a factor of stress but it didn't cause any error.
>>>>>
>>>>> Then I almost doubled the number of rows to process and finally the
>>>>> error showed up again.
>>>>>
>>>>> It seems somehow related to spilling to disk but I can't really
>>>>> understand what's going on :(
>>>>>
>>>>> This is a summary of my debug attempts:
>>>>>
>>>>>
>>>>>
>>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>>
>>>>>
>>>>>
>>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>>
>>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>>
>>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows
>>>>> => OK
>>>>>
>>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows
>>>>> => OK
>>>>>
>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>
>>>>>
>>>>>
>>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>>
>>>>>
>>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>>
>>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>>
>>>>>
>>>>>
>>>>> Any help is appreciated..
>>>>>
>>>>> Best,
>>>>>
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>> I could but only if there's a good probability that it fix the
>>>>> problem...how confident are you about it?
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>>
>>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>>> recent change.
>>>>>
>>>>>
>>>>>
>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>> reproducible ?
>>>>>
>>>>>
>>>>>
>>>>> Cheers
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>> Hi to all,
>>>>>
>>>>> I think I'm again on the weird Exception with the
>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>
>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>>
>>>>>
>>>>>
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>> null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1094)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>> (GroupReduceDriver.java:99)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:460)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException
>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:69)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:74)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:28)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:193)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:36)
>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:42)
>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>>
>>>>> Best,
>>>>>
>>>>> Flavio
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
Thanks for the explanation . Is there a way to force this behaviour in a
local environment (to try to debug the problem)?

On 21 Apr 2017 21:49, "Fabian Hueske" <fh...@gmail.com> wrote:

> Hi Flavio,
>
> these files are used for spilling data to disk. In your case sorted runs
> of records.
> Later all (up to a fanout threshold) these sorted runs are read and merged
> to get a completely sorted record stream.
>
> 2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:
>
>> The error appears as soon as some taskmanager generates some inputchannel
>> file.
>> What are those files used for?
>>
>> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> In another run of the job I had another Exception. Could it be helpful?
>>>
>>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>>> k.java:355)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>>> Serializer consumed more bytes than the record had. This indicates broken
>>> serialization. If you are using custom serialization types (Value or
>>> Writable), check their serialization methods. If you are using a
>>> Kryo-serialized type, check the corresponding Kryo serializer.
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>>> record had. This indicates broken serialization. If you are using custom
>>> serialization types (Value or Writable), check their serialization methods.
>>> If you are using a Kryo-serialized type, check the corresponding Kryo
>>> serializer.
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:123)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>>> ySegment.java:104)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>> Byte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:36)
>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>> gate.read(ReusingDeserializationDelegate.java:57)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:109)
>>> ... 5 more
>>>
>>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>>> stefano.bortoli@huawei.com> wrote:
>>>
>>>> In fact the old problem was with the KryoSerializer missed
>>>> initialization on the exception that would trigger the spilling on disk.
>>>> This would lead to dirty serialization buffer that would eventually break
>>>> the program. Till worked on it debugging the source code generating the
>>>> error. Perhaps someone could try the same also this time. If Flavio can
>>>> make the problem reproducible in a shareable program+data.
>>>>
>>>>
>>>>
>>>> Stefano
>>>>
>>>>
>>>>
>>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>>> *To:* user <us...@flink.apache.org>
>>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>>
>>>>
>>>>
>>>> In the past, these errors were most often caused by bugs in the
>>>> serializers, not in the sorter.
>>>>
>>>>
>>>>
>>>> What types are you using at that point? The Stack Trace reveals ROW and
>>>> StringValue, any other involved types?
>>>>
>>>>
>>>>
>>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>>> spilling to disk) and the job failed almost immediately..
>>>>
>>>>
>>>>
>>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> I debugged a bit the process repeating the job on a sub-slice of the
>>>> entire data (using the id value to filter data with parquet push down
>>>> filters) and all slices completed successfully :(
>>>>
>>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>>> if this was somehow a factor of stress but it didn't cause any error.
>>>>
>>>> Then I almost doubled the number of rows to process and finally the
>>>> error showed up again.
>>>>
>>>> It seems somehow related to spilling to disk but I can't really
>>>> understand what's going on :(
>>>>
>>>> This is a summary of my debug attempts:
>>>>
>>>>
>>>>
>>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>>
>>>>
>>>>
>>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>>
>>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>>
>>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows =>
>>>> OK
>>>>
>>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows
>>>> => OK
>>>>
>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>
>>>>
>>>>
>>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>>
>>>>
>>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>>
>>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>>
>>>>
>>>>
>>>> Any help is appreciated..
>>>>
>>>> Best,
>>>>
>>>> Flavio
>>>>
>>>>
>>>>
>>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> I could but only if there's a good probability that it fix the
>>>> problem...how confident are you about it?
>>>>
>>>>
>>>>
>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>> recent change.
>>>>
>>>>
>>>>
>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>> reproducible ?
>>>>
>>>>
>>>>
>>>> Cheers
>>>>
>>>>
>>>>
>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>> Hi to all,
>>>>
>>>> I think I'm again on the weird Exception with the
>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>
>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>
>>>>
>>>>
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1094)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:99)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.EOFException
>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>> gnedByte(DataInputDeserializer.java:306)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:69)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:74)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:28)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:193)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:36)
>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:144)
>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:42)
>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>>
>>>> Best,
>>>>
>>>> Flavio
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>>
>

Re: UnilateralSortMerger error (again)

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Flavio,

these files are used for spilling data to disk. In your case sorted runs of
records.
Later all (up to a fanout threshold) these sorted runs are read and merged
to get a completely sorted record stream.

2017-04-21 14:09 GMT+02:00 Flavio Pompermaier <po...@okkam.it>:

> The error appears as soon as some taskmanager generates some inputchannel
> file.
> What are those files used for?
>
> On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> In another run of the job I had another Exception. Could it be helpful?
>>
>> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception:
>> Serializer consumed more bytes than the record had. This indicates broken
>> serialization. If you are using custom serialization types (Value or
>> Writable), check their serialization methods. If you are using a
>> Kryo-serialized type, check the corresponding Kryo serializer.
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.IOException: Serializer consumed more bytes than the
>> record had. This indicates broken serialization. If you are using custom
>> serialization types (Value or Writable), check their serialization methods.
>> If you are using a Kryo-serialized type, check the corresponding Kryo
>> serializer.
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:123)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
>> at org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemor
>> ySegment.java:104)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
>> readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.read
>> UnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:109)
>> ... 5 more
>>
>> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
>> stefano.bortoli@huawei.com> wrote:
>>
>>> In fact the old problem was with the KryoSerializer missed
>>> initialization on the exception that would trigger the spilling on disk.
>>> This would lead to dirty serialization buffer that would eventually break
>>> the program. Till worked on it debugging the source code generating the
>>> error. Perhaps someone could try the same also this time. If Flavio can
>>> make the problem reproducible in a shareable program+data.
>>>
>>>
>>>
>>> Stefano
>>>
>>>
>>>
>>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>>> *Sent:* Friday, April 21, 2017 10:04 AM
>>> *To:* user <us...@flink.apache.org>
>>> *Subject:* Re: UnilateralSortMerger error (again)
>>>
>>>
>>>
>>> In the past, these errors were most often caused by bugs in the
>>> serializers, not in the sorter.
>>>
>>>
>>>
>>> What types are you using at that point? The Stack Trace reveals ROW and
>>> StringValue, any other involved types?
>>>
>>>
>>>
>>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>>> spilling to disk) and the job failed almost immediately..
>>>
>>>
>>>
>>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> I debugged a bit the process repeating the job on a sub-slice of the
>>> entire data (using the id value to filter data with parquet push down
>>> filters) and all slices completed successfully :(
>>>
>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>> if this was somehow a factor of stress but it didn't cause any error.
>>>
>>> Then I almost doubled the number of rows to process and finally the
>>> error showed up again.
>>>
>>> It seems somehow related to spilling to disk but I can't really
>>> understand what's going on :(
>>>
>>> This is a summary of my debug attempts:
>>>
>>>
>>>
>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>
>>>
>>>
>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>>
>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>>
>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows =>
>>> OK
>>>
>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
>>> OK
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>>
>>>
>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>
>>>
>>>
>>> Any help is appreciated..
>>>
>>> Best,
>>>
>>> Flavio
>>>
>>>
>>>
>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> I could but only if there's a good probability that it fix the
>>> problem...how confident are you about it?
>>>
>>>
>>>
>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>> Looking at git log of DataInputDeserializer.java , there has been some
>>> recent change.
>>>
>>>
>>>
>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>> reproducible ?
>>>
>>>
>>>
>>> Cheers
>>>
>>>
>>>
>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>> Hi to all,
>>>
>>> I think I'm again on the weird Exception with the
>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>
>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>> disk but the Exception thrown is not very helpful. Any idea?
>>>
>>>
>>>
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.EOFException
>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>> gnedByte(DataInputDeserializer.java:306)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:36)
>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>> gate.read(ReusingDeserializationDelegate.java:57)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:144)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>
>>>
>>> Best,
>>>
>>> Flavio
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
The error appears as soon as some taskmanager generates some inputchannel
file.
What are those files used for?

On Fri, Apr 21, 2017 at 11:53 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> In another run of the job I had another Exception. Could it be helpful?
>
> Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception:
> Serializer consumed more bytes than the record had. This indicates broken
> serialization. If you are using custom serialization types (Value or
> Writable), check their serialization methods. If you are using a
> Kryo-serialized type, check the corresponding Kryo serializer.
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.IOException: Serializer consumed more bytes than the
> record had. This indicates broken serialization. If you are using custom
> serialization types (Value or Writable), check their serialization methods.
> If you are using a Kryo-serialized type, check the corresponding Kryo
> serializer.
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:123)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
> at org.apache.flink.core.memory.HeapMemorySegment.get(
> HeapMemorySegment.java:104)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(
> SpillingAdaptiveSpanningRecordDeserializer.java:226)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.
> readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
> at org.apache.flink.types.StringValue.readString(StringValue.java:770)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:109)
> ... 5 more
>
> On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
> stefano.bortoli@huawei.com> wrote:
>
>> In fact the old problem was with the KryoSerializer missed initialization
>> on the exception that would trigger the spilling on disk. This would lead
>> to dirty serialization buffer that would eventually break the program. Till
>> worked on it debugging the source code generating the error. Perhaps
>> someone could try the same also this time. If Flavio can make the problem
>> reproducible in a shareable program+data.
>>
>>
>>
>> Stefano
>>
>>
>>
>> *From:* Stephan Ewen [mailto:sewen@apache.org]
>> *Sent:* Friday, April 21, 2017 10:04 AM
>> *To:* user <us...@flink.apache.org>
>> *Subject:* Re: UnilateralSortMerger error (again)
>>
>>
>>
>> In the past, these errors were most often caused by bugs in the
>> serializers, not in the sorter.
>>
>>
>>
>> What types are you using at that point? The Stack Trace reveals ROW and
>> StringValue, any other involved types?
>>
>>
>>
>> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>> spilling to disk) and the job failed almost immediately..
>>
>>
>>
>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>> I debugged a bit the process repeating the job on a sub-slice of the
>> entire data (using the id value to filter data with parquet push down
>> filters) and all slices completed successfully :(
>>
>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>> if this was somehow a factor of stress but it didn't cause any error.
>>
>> Then I almost doubled the number of rows to process and finally the error
>> showed up again.
>>
>> It seems somehow related to spilling to disk but I can't really
>> understand what's going on :(
>>
>> This is a summary of my debug attempts:
>>
>>
>>
>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>
>>
>>
>> id < 10.000.000.000  => 1.857.365 rows => OK
>>
>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>
>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
>>
>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
>> OK
>>
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>
>>
>>
>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>
>>
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>
>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>
>>
>>
>> Any help is appreciated..
>>
>> Best,
>>
>> Flavio
>>
>>
>>
>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>> I could but only if there's a good probability that it fix the
>> problem...how confident are you about it?
>>
>>
>>
>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>> Looking at git log of DataInputDeserializer.java , there has been some
>> recent change.
>>
>>
>>
>> If you have time, maybe try with 1.2.1 RC and see if the error is
>> reproducible ?
>>
>>
>>
>> Cheers
>>
>>
>>
>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>> Hi to all,
>>
>> I think I'm again on the weird Exception with the
>> SpillingAdaptiveSpanningRecordDeserializer...
>>
>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>> disk but the Exception thrown is not very helpful. Any idea?
>>
>>
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.EOFException
>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>> gnedByte(DataInputDeserializer.java:306)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>> serialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>> serialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:144)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> Best,
>>
>> Flavio
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
In another run of the job I had another Exception. Could it be helpful?

Error obtaining the sorted input: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception:
Serializer consumed more bytes than the record had. This indicates broken
serialization. If you are using custom serialization types (Value or
Writable), check their serialization methods. If you are using a
Kryo-serialized type, check the corresponding Kryo serializer.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at
org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.IOException: Serializer consumed more bytes than the
record had. This indicates broken serialization. If you are using custom
serialization types (Value or Writable), check their serialization methods.
If you are using a Kryo-serialized type, check the corresponding Kryo
serializer.
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:123)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 32768
at
org.apache.flink.core.memory.HeapMemorySegment.get(HeapMemorySegment.java:104)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readByte(SpillingAdaptiveSpanningRecordDeserializer.java:226)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer$NonSpanningWrapper.readUnsignedByte(SpillingAdaptiveSpanningRecordDeserializer.java:231)
at org.apache.flink.types.StringValue.readString(StringValue.java:770)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
... 5 more

On Fri, Apr 21, 2017 at 11:43 AM, Stefano Bortoli <
stefano.bortoli@huawei.com> wrote:

> In fact the old problem was with the KryoSerializer missed initialization
> on the exception that would trigger the spilling on disk. This would lead
> to dirty serialization buffer that would eventually break the program. Till
> worked on it debugging the source code generating the error. Perhaps
> someone could try the same also this time. If Flavio can make the problem
> reproducible in a shareable program+data.
>
>
>
> Stefano
>
>
>
> *From:* Stephan Ewen [mailto:sewen@apache.org]
> *Sent:* Friday, April 21, 2017 10:04 AM
> *To:* user <us...@flink.apache.org>
> *Subject:* Re: UnilateralSortMerger error (again)
>
>
>
> In the past, these errors were most often caused by bugs in the
> serializers, not in the sorter.
>
>
>
> What types are you using at that point? The Stack Trace reveals ROW and
> StringValue, any other involved types?
>
>
>
> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
> spilling to disk) and the job failed almost immediately..
>
>
>
> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> I debugged a bit the process repeating the job on a sub-slice of the
> entire data (using the id value to filter data with parquet push down
> filters) and all slices completed successfully :(
>
> So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
> this was somehow a factor of stress but it didn't cause any error.
>
> Then I almost doubled the number of rows to process and finally the error
> showed up again.
>
> It seems somehow related to spilling to disk but I can't really understand
> what's going on :(
>
> This is a summary of my debug attempts:
>
>
>
> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>
>
>
> id < 10.000.000.000  => 1.857.365 rows => OK
>
> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>
> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
>
> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows => OK
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
>
>
> 4 TM with 8 GB and 4 slot each, parallelism 16
>
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>
>
>
> Any help is appreciated..
>
> Best,
>
> Flavio
>
>
>
> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> I could but only if there's a good probability that it fix the
> problem...how confident are you about it?
>
>
>
> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>
> Looking at git log of DataInputDeserializer.java , there has been some
> recent change.
>
>
>
> If you have time, maybe try with 1.2.1 RC and see if the error is
> reproducible ?
>
>
>
> Cheers
>
>
>
> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
> Hi to all,
>
> I think I'm again on the weird Exception with the
> SpillingAdaptiveSpanningRecordDeserializer...
>
> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
> disk but the Exception thrown is not very helpful. Any idea?
>
>
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.EOFException
> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(
> DataInputDeserializer.java:306)
> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
> serialize(RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
> serialize(RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:144)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
>
> Best,
>
> Flavio
>
>
>
>
>
>
>
>
>
>
>

RE: UnilateralSortMerger error (again)

Posted by Stefano Bortoli <st...@huawei.com>.
In fact the old problem was with the KryoSerializer missed initialization on the exception that would trigger the spilling on disk. This would lead to dirty serialization buffer that would eventually break the program. Till worked on it debugging the source code generating the error. Perhaps someone could try the same also this time. If Flavio can make the problem reproducible in a shareable program+data.

Stefano

From: Stephan Ewen [mailto:sewen@apache.org]
Sent: Friday, April 21, 2017 10:04 AM
To: user <us...@flink.apache.org>
Subject: Re: UnilateralSortMerger error (again)

In the past, these errors were most often caused by bugs in the serializers, not in the sorter.

What types are you using at that point? The Stack Trace reveals ROW and StringValue, any other involved types?

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <po...@okkam.it>> wrote:
As suggested by Fabian I set taskmanager.memory.size = 1024 (to force spilling to disk) and the job failed almost immediately..

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <po...@okkam.it>> wrote:
I debugged a bit the process repeating the job on a sub-slice of the entire data (using the id value to filter data with parquet push down filters) and all slices completed successfully :(
So I tried to increase the parallelism (from 1 slot per TM to 4) to see if this was somehow a factor of stress but it didn't cause any error.
Then I almost doubled the number of rows to process and finally the error showed up again.
It seems somehow related to spilling to disk but I can't really understand what's going on :(
This is a summary of my debug attempts:

4 Task managers with 6 GB  and 1 slot each, parallelism = 4

id < 10.000.000.000  => 1.857.365 rows => OK
id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows => OK
id >= 99.960.000.000 => 32.936.422 rows => OK

4 TM with 8 GB and 4 slot each, parallelism 16

id >= 99.960.000.000 => 32.936.422 rows => OK
id >= 99.945.000.000  => 56.825.172 rows => ERROR

Any help is appreciated..
Best,
Flavio

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <po...@okkam.it>> wrote:
I could but only if there's a good probability that it fix the problem...how confident are you about it?

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com>> wrote:
Looking at git log of DataInputDeserializer.java , there has been some recent change.

If you have time, maybe try with 1.2.1 RC and see if the error is reproducible ?

Cheers

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <po...@okkam.it>> wrote:
Hi to all,
I think I'm again on the weird Exception with the SpillingAdaptiveSpanningRecordDeserializer...
I'm using Flink 1.2.0 and the error seems to rise when Flink spills to disk but the Exception thrown is not very helpful. Any idea?

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
at org.apache.flink.types.StringValue.readString(StringValue.java:747)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:74)
at org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de<http://utils.runtime.RowSerializer.de>serialize(RowSerializer.java:193)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de<http://utils.runtime.RowSerializer.de>serialize(RowSerializer.java:36)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io<http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


Best,
Flavio






Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
The types I read are:

[String, String, String, String, String, String, String, String, String,
Boolean, Long, Long, Long, Integer, Integer, Long, String, String, Long,
Long, String, Long, String, String, String, String, String, String, String,
String, String, String, String, String, String, String, String]

On Fri, Apr 21, 2017 at 10:03 AM, Stephan Ewen <se...@apache.org> wrote:

> In the past, these errors were most often caused by bugs in the
> serializers, not in the sorter.
>
> What types are you using at that point? The Stack Trace reveals ROW and
> StringValue, any other involved types?
>
> On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
>> spilling to disk) and the job failed almost immediately..
>>
>> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> I debugged a bit the process repeating the job on a sub-slice of the
>>> entire data (using the id value to filter data with parquet push down
>>> filters) and all slices completed successfully :(
>>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>>> if this was somehow a factor of stress but it didn't cause any error.
>>> Then I almost doubled the number of rows to process and finally the
>>> error showed up again.
>>> It seems somehow related to spilling to disk but I can't really
>>> understand what's going on :(
>>> This is a summary of my debug attempts:
>>>
>>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>>
>>> id < 10.000.000.000  => 1.857.365 rows => OK
>>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows =>
>>> OK
>>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
>>> OK
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>>
>>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>>
>>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>>
>>> Any help is appreciated..
>>> Best,
>>> Flavio
>>>
>>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> I could but only if there's a good probability that it fix the
>>>> problem...how confident are you about it?
>>>>
>>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>>> recent change.
>>>>>
>>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>>> reproducible ?
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>>> pompermaier@okkam.it> wrote:
>>>>>
>>>>>> Hi to all,
>>>>>> I think I'm again on the weird Exception with the
>>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills
>>>>>> to disk but the Exception thrown is not very helpful. Any idea?
>>>>>>
>>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>>> null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>>> ask.java:1094)
>>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>>> (GroupReduceDriver.java:99)
>>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>>> ava:460)
>>>>>> ... 3 more
>>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>>> terminated due to an exception: null
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>>> Caused by: java.io.EOFException
>>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>>> at org.apache.flink.types.StringValue.readString(StringValue.ja
>>>>>> va:747)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:69)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:74)
>>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>>> deserialize(StringSerializer.java:28)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:193)
>>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>>> serialize(RowSerializer.java:36)
>>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>>> Reader.next(MutableRecordReader.java:42)
>>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>>> ReaderIterator.java:59)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Re: UnilateralSortMerger error (again)

Posted by Stephan Ewen <se...@apache.org>.
In the past, these errors were most often caused by bugs in the
serializers, not in the sorter.

What types are you using at that point? The Stack Trace reveals ROW and
StringValue, any other involved types?

On Fri, Apr 21, 2017 at 9:36 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
> spilling to disk) and the job failed almost immediately..
>
> On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> I debugged a bit the process repeating the job on a sub-slice of the
>> entire data (using the id value to filter data with parquet push down
>> filters) and all slices completed successfully :(
>> So I tried to increase the parallelism (from 1 slot per TM to 4) to see
>> if this was somehow a factor of stress but it didn't cause any error.
>> Then I almost doubled the number of rows to process and finally the error
>> showed up again.
>> It seems somehow related to spilling to disk but I can't really
>> understand what's going on :(
>> This is a summary of my debug attempts:
>>
>> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>>
>> id < 10.000.000.000  => 1.857.365 rows => OK
>> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
>> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows =>
>> OK
>> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
>> OK
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>>
>> 4 TM with 8 GB and 4 slot each, parallelism 16
>>
>> id >= 99.960.000.000 => 32.936.422 rows => OK
>> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>>
>> Any help is appreciated..
>> Best,
>> Flavio
>>
>> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> I could but only if there's a good probability that it fix the
>>> problem...how confident are you about it?
>>>
>>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Looking at git log of DataInputDeserializer.java , there has been some
>>>> recent change.
>>>>
>>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>>> reproducible ?
>>>>
>>>> Cheers
>>>>
>>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>>> pompermaier@okkam.it> wrote:
>>>>
>>>>> Hi to all,
>>>>> I think I'm again on the weird Exception with the
>>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>>
>>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>>> null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> .getIterator(UnilateralSortMerger.java:619)
>>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>>> ask.java:1094)
>>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>>> (GroupReduceDriver.java:99)
>>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>>>>> ava:460)
>>>>> ... 3 more
>>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>>> terminated due to an exception: null
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>>> Caused by: java.io.EOFException
>>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>>> gnedByte(DataInputDeserializer.java:306)
>>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:69)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:74)
>>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>>> deserialize(StringSerializer.java:28)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:193)
>>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>>> serialize(RowSerializer.java:36)
>>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>>> daptiveSpanningRecordDeserializer.java:144)
>>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>>> Reader.next(MutableRecordReader.java:42)
>>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>>> ReaderIterator.java:59)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>>
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>
>>>>
>>>
>>>
>>
>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
As suggested by Fabian I set taskmanager.memory.size = 1024 (to force
spilling to disk) and the job failed almost immediately..

On Fri, Apr 21, 2017 at 12:33 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> I debugged a bit the process repeating the job on a sub-slice of the
> entire data (using the id value to filter data with parquet push down
> filters) and all slices completed successfully :(
> So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
> this was somehow a factor of stress but it didn't cause any error.
> Then I almost doubled the number of rows to process and finally the error
> showed up again.
> It seems somehow related to spilling to disk but I can't really understand
> what's going on :(
> This is a summary of my debug attempts:
>
> 4 Task managers with 6 GB  and 1 slot each, parallelism = 4
>
> id < 10.000.000.000  => 1.857.365 rows => OK
> id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
> id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
> id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows =>
> OK
> id >= 99.960.000.000 => 32.936.422 rows => OK
>
> 4 TM with 8 GB and 4 slot each, parallelism 16
>
> id >= 99.960.000.000 => 32.936.422 rows => OK
> id >= 99.945.000.000  => 56.825.172 rows => ERROR
>
> Any help is appreciated..
> Best,
> Flavio
>
> On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> I could but only if there's a good probability that it fix the
>> problem...how confident are you about it?
>>
>> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>>
>>> Looking at git log of DataInputDeserializer.java , there has been some
>>> recent change.
>>>
>>> If you have time, maybe try with 1.2.1 RC and see if the error is
>>> reproducible ?
>>>
>>> Cheers
>>>
>>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> Hi to all,
>>>> I think I'm again on the weird Exception with the
>>>> SpillingAdaptiveSpanningRecordDeserializer...
>>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>>> disk but the Exception thrown is not very helpful. Any idea?
>>>>
>>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted
>>>> input: Thread 'SortMerger Reading Thread' terminated due to an exception:
>>>> null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> .getIterator(UnilateralSortMerger.java:619)
>>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>>> ask.java:1094)
>>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>>> (GroupReduceDriver.java:99)
>>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>>> ... 3 more
>>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>>> terminated due to an exception: null
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>>> Caused by: java.io.EOFException
>>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>>> gnedByte(DataInputDeserializer.java:306)
>>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:69)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:74)
>>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>>> deserialize(StringSerializer.java:28)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:193)
>>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>>> serialize(RowSerializer.java:36)
>>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>>> gate.read(ReusingDeserializationDelegate.java:57)
>>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>>> daptiveSpanningRecordDeserializer.java:144)
>>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>>> Reader.next(MutableRecordReader.java:42)
>>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>>> ReaderIterator.java:59)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>>
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>
>>
>>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
I debugged a bit the process repeating the job on a sub-slice of the entire
data (using the id value to filter data with parquet push down filters) and
all slices completed successfully :(
So I tried to increase the parallelism (from 1 slot per TM to 4) to see if
this was somehow a factor of stress but it didn't cause any error.
Then I almost doubled the number of rows to process and finally the error
showed up again.
It seems somehow related to spilling to disk but I can't really understand
what's going on :(
This is a summary of my debug attempts:

4 Task managers with 6 GB  and 1 slot each, parallelism = 4

id < 10.000.000.000  => 1.857.365 rows => OK
id >= 10.000.000.000 && id < 10.010.000.000 => 20.057.714 rows => OK
id >= 10.010.000.000 && id < 99.945.000.000       => 20.926.903 rows => OK
id >= 99.945.000.000 && id < 99.960.000.000       => 23.888.750  rows => OK
id >= 99.960.000.000 => 32.936.422 rows => OK

4 TM with 8 GB and 4 slot each, parallelism 16

id >= 99.960.000.000 => 32.936.422 rows => OK
id >= 99.945.000.000  => 56.825.172 rows => ERROR

Any help is appreciated..
Best,
Flavio

On Wed, Apr 19, 2017 at 8:34 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> I could but only if there's a good probability that it fix the
> problem...how confident are you about it?
>
> On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:
>
>> Looking at git log of DataInputDeserializer.java , there has been some
>> recent change.
>>
>> If you have time, maybe try with 1.2.1 RC and see if the error is
>> reproducible ?
>>
>> Cheers
>>
>> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> Hi to all,
>>> I think I'm again on the weird Exception with the
>>> SpillingAdaptiveSpanningRecordDeserializer...
>>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>>> disk but the Exception thrown is not very helpful. Any idea?
>>>
>>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> .getIterator(UnilateralSortMerger.java:619)
>>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>>> ask.java:1094)
>>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>>> (GroupReduceDriver.java:99)
>>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>>> ... 3 more
>>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>>> terminated due to an exception: null
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:799)
>>> Caused by: java.io.EOFException
>>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>>> gnedByte(DataInputDeserializer.java:306)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:69)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:74)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>>> deserialize(StringSerializer.java:28)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:193)
>>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.de
>>> serialize(RowSerializer.java:36)
>>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>>> gate.read(ReusingDeserializationDelegate.java:57)
>>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>>> daptiveSpanningRecordDeserializer.java:144)
>>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>>> dReader.getNextRecord(AbstractRecordReader.java:72)
>>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>>> Reader.next(MutableRecordReader.java:42)
>>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>>> ReaderIterator.java:59)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>>
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>

Re: UnilateralSortMerger error (again)

Posted by Flavio Pompermaier <po...@okkam.it>.
I could but only if there's a good probability that it fix the
problem...how confident are you about it?

On Wed, Apr 19, 2017 at 8:27 PM, Ted Yu <yu...@gmail.com> wrote:

> Looking at git log of DataInputDeserializer.java , there has been some
> recent change.
>
> If you have time, maybe try with 1.2.1 RC and see if the error is
> reproducible ?
>
> Cheers
>
> On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> Hi to all,
>> I think I'm again on the weird Exception with the
>> SpillingAdaptiveSpanningRecordDeserializer...
>> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
>> disk but the Exception thrown is not very helpful. Any idea?
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1094)
>> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare
>> (GroupReduceDriver.java:99)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
>> ... 3 more
>> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
>> terminated due to an exception: null
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:799)
>> Caused by: java.io.EOFException
>> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsi
>> gnedByte(DataInputDeserializer.java:306)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:69)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:74)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer.
>> deserialize(StringSerializer.java:28)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:193)
>> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.
>> deserialize(RowSerializer.java:36)
>> at org.apache.flink.runtime.plugable.ReusingDeserializationDele
>> gate.read(ReusingDeserializationDelegate.java:57)
>> at org.apache.flink.runtime.io.network.api.serialization.Spilli
>> ngAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingA
>> daptiveSpanningRecordDeserializer.java:144)
>> at org.apache.flink.runtime.io.network.api.reader.AbstractRecor
>> dReader.getNextRecord(AbstractRecordReader.java:72)
>> at org.apache.flink.runtime.io.network.api.reader.MutableRecord
>> Reader.next(MutableRecordReader.java:42)
>> at org.apache.flink.runtime.operators.util.ReaderIterator.next(
>> ReaderIterator.java:59)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ReadingThread.go(UnilateralSortMerger.java:1035)
>> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:796)
>>
>>
>> Best,
>> Flavio
>>
>
>

Re: UnilateralSortMerger error (again)

Posted by Ted Yu <yu...@gmail.com>.
Looking at git log of DataInputDeserializer.java , there has been some
recent change.

If you have time, maybe try with 1.2.1 RC and see if the error is
reproducible ?

Cheers

On Wed, Apr 19, 2017 at 11:22 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Hi to all,
> I think I'm again on the weird Exception with the
> SpillingAdaptiveSpanningRecordDeserializer...
> I'm using Flink 1.2.0 and the error seems to rise when Flink spills to
> disk but the Exception thrown is not very helpful. Any idea?
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.io.EOFException
> at org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(
> DataInputDeserializer.java:306)
> at org.apache.flink.types.StringValue.readString(StringValue.java:747)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:69)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:74)
> at org.apache.flink.api.common.typeutils.base.
> StringSerializer.deserialize(StringSerializer.java:28)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:193)
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(
> RowSerializer.java:36)
> at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(
> ReusingDeserializationDelegate.java:57)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:144)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:72)
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
>
> Best,
> Flavio
>