You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sapei, Ferry Syafei" <Fe...@otto.de> on 2017/03/29 16:00:34 UTC

GroupReduce is interrupted on reading large CSV files

Hallo everyone,



I have a Flink batch job, which reads four CSV files. The rows in the files=  will be read and grouped together.



When the four CSV Files are small enough, the job can finish successfully. = However when the input files are large, the job could not successfully exec= uted and the following exception as shown below.



Could somebody please help me to fix this problem?





Best regards,

Ferry







2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort buffer without large record handling.

2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at org.apache.flink.runtime.io.disk.RandomAccessInputView.nextSegment(RandomAccessInputView.java:79)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.readLong(AbstractPagedInputView.java:357)

        at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:287)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:406)

        at org.apache.flink.api.common.typeutils.base.BigIntSerializer.copyBigInteger(BigIntSerializer.java:141)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.copy(BigDecSerializer.java:104)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:212)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.operators.BatchTask                  - Releasing all broadcast variables.:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)

2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Spilling buffer 0.

2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort buffer without large record handling.

2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1249)

        at java.lang.Thread.join(Thread.java:1323)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.close(UnilateralSortMerger.java:480)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:367)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:362)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.internalHandleException(UnilateralSortMerger.java:842)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (1/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at org.apache.flink.runtime.util.DataInputDeserializer.readInt(DataInputDeserializer.java:179)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.readBigDecimal(BigDecSerializer.java:125)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:99)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:31)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)

        at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)

        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)

        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Emitting final buffer from reader thread: 1.


Re: GroupReduce is interrupted on reading large CSV files

Posted by Kurt Young <yk...@gmail.com>.
Hi,
Can you share your input file to help us debugging the problem?


Best,
Kurt

On Thu, Mar 30, 2017 at 12:00 AM, Sapei, Ferry Syafei <
FerrySyafei.Sapei@otto.de> wrote:

> Hallo everyone,
>
>
>
> I have a Flink batch job, which reads four CSV files. The rows in the
> files=  will be read and grouped together.
>
>
>
> When the four CSV Files are small enough, the job can finish successfully.
> = However when the input files are large, the job could not successfully
> exec= uted and the following exception as shown below.
>
>
>
> Could somebody please help me to fix this problem?
>
>
>
>
>
> Best regards,
>
> Ferry
>
>
>
>
>
>
>
> 2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter
> - Spilling sort buffer without large record handling.
>
> 2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.
> operators.BatchTask                  - Error in task code:  CHAIN
> GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140))
> -> Map (Key Extractor) (15/16)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) ->
> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:465)
>
>         at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>
>         at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
>
>         at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> Caused by: java.io.EOFException
>
>         at org.apache.flink.runtime.io.disk.RandomAccessInputView.
> nextSegment(RandomAccessInputView.java:79)
>
>         at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(
> AbstractPagedInputView.java:159)
>
>         at org.apache.flink.runtime.memory.AbstractPagedInputView.
> readLong(AbstractPagedInputView.java:357)
>
>         at org.apache.flink.core.memory.HybridMemorySegment.put(
> HybridMemorySegment.java:287)
>
>         at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(
> AbstractPagedOutputView.java:406)
>
>         at org.apache.flink.api.common.typeutils.base.BigIntSerializer.
> copyBigInteger(BigIntSerializer.java:141)
>
>         at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.copy(BigDecSerializer.java:104)
>
>         at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:212)
>
>         at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
> writeToOutput(NormalizedKeySorter.java:499)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> 2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.
> operators.BatchTask                  - Releasing all broadcast
> variables.:  CHAIN GroupReduce (GroupReduce at readCsvRows(
> SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)
>
> 2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Spilling buffer 0.
>
> 2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter
> - Spilling sort buffer without large record handling.
>
> 2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Closing of sort/merger was interrupted. The reading/sorting/spilling
> threads may still be working.
>
> java.lang.InterruptedException
>
>         at java.lang.Object.wait(Native Method)
>
>         at java.lang.Thread.join(Thread.java:1249)
>
>         at java.lang.Thread.join(Thread.java:1323)
>
>         at org.apache.flink.runtime.operators.sort.
> UnilateralSortMerger.close(UnilateralSortMerger.java:480)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.
> handleException(UnilateralSortMerger.java:367)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.
> handleException(UnilateralSortMerger.java:362)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.internalHandleException(UnilateralSortMerger.java:842)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> 2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.
> operators.BatchTask                  - Error in task code:  CHAIN
> GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140))
> -> Map (Key Extractor) (1/16)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) ->
> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:465)
>
>         at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>
>         at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
>
>         at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>
>         at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> Caused by: java.io.EOFException
>
>         at org.apache.flink.runtime.util.DataInputDeserializer.readInt(
> DataInputDeserializer.java:179)
>
>         at org.apache.flink.api.common.typeutils.base.BigDecSerializer.
> readBigDecimal(BigDecSerializer.java:125)
>
>         at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.deserialize(BigDecSerializer.java:99)
>
>         at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.deserialize(BigDecSerializer.java:31)
>
>         at org.apache.flink.api.java.typeutils.runtime.
> RowSerializer.deserialize(RowSerializer.java:193)
>
>         at org.apache.flink.api.java.typeutils.runtime.
> RowSerializer.deserialize(RowSerializer.java:36)
>
>         at org.apache.flink.runtime.plugable.
> ReusingDeserializationDelegate.read(ReusingDeserializationDelegate
> .java:57)
>
>         at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:144)
>
>         at org.apache.flink.runtime.io.network.api.reader.
> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
>         at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
>
>         at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
>
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> 2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Emitting final buffer from reader thread: 1.
>
>
>