You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sapei, Ferry Syafei" <Fe...@otto.de> on 2017/03/29 16:00:34 UTC
GroupReduce is interrupted on reading large CSV files
Hallo everyone,
I have a Flink batch job, which reads four CSV files. The rows in the files= will be read and grouped together.
When the four CSV Files are small enough, the job can finish successfully. = However when the input files are large, the job could not successfully exec= uted and the following exception as shown below.
Could somebody please help me to fix this problem?
Best regards,
Ferry
2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter - Spilling sort buffer without large record handling.
2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at org.apache.flink.runtime.io.disk.RandomAccessInputView.nextSegment(RandomAccessInputView.java:79)
at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
at org.apache.flink.runtime.memory.AbstractPagedInputView.readLong(AbstractPagedInputView.java:357)
at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:287)
at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:406)
at org.apache.flink.api.common.typeutils.base.BigIntSerializer.copyBigInteger(BigIntSerializer.java:141)
at org.apache.flink.api.common.typeutils.base.BigDecSerializer.copy(BigDecSerializer.java:104)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:212)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.operators.BatchTask - Releasing all broadcast variables.: CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)
2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger - Spilling buffer 0.
2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter - Spilling sort buffer without large record handling.
2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger - Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
at java.lang.Thread.join(Thread.java:1323)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.close(UnilateralSortMerger.java:480)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:367)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:362)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.internalHandleException(UnilateralSortMerger.java:842)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (1/16)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException
at org.apache.flink.runtime.util.DataInputDeserializer.readInt(DataInputDeserializer.java:179)
at org.apache.flink.api.common.typeutils.base.BigDecSerializer.readBigDecimal(BigDecSerializer.java:125)
at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:99)
at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:31)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)
at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)
at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger - Emitting final buffer from reader thread: 1.
Re: GroupReduce is interrupted on reading large CSV files
Posted by Kurt Young <yk...@gmail.com>.
Hi,
Can you share your input file to help us debugging the problem?
Best,
Kurt
On Thu, Mar 30, 2017 at 12:00 AM, Sapei, Ferry Syafei <
FerrySyafei.Sapei@otto.de> wrote:
> Hallo everyone,
>
>
>
> I have a Flink batch job, which reads four CSV files. The rows in the
> files= will be read and grouped together.
>
>
>
> When the four CSV Files are small enough, the job can finish successfully.
> = However when the input files are large, the job could not successfully
> exec= uted and the following exception as shown below.
>
>
>
> Could somebody please help me to fix this problem?
>
>
>
>
>
> Best regards,
>
> Ferry
>
>
>
>
>
>
>
> 2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter
> - Spilling sort buffer without large record handling.
>
> 2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.
> operators.BatchTask - Error in task code: CHAIN
> GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140))
> -> Map (Key Extractor) (15/16)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) ->
> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: null
>
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:465)
>
> at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception: null
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
>
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>
> ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: null
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> Caused by: java.io.EOFException
>
> at org.apache.flink.runtime.io.disk.RandomAccessInputView.
> nextSegment(RandomAccessInputView.java:79)
>
> at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(
> AbstractPagedInputView.java:159)
>
> at org.apache.flink.runtime.memory.AbstractPagedInputView.
> readLong(AbstractPagedInputView.java:357)
>
> at org.apache.flink.core.memory.HybridMemorySegment.put(
> HybridMemorySegment.java:287)
>
> at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(
> AbstractPagedOutputView.java:406)
>
> at org.apache.flink.api.common.typeutils.base.BigIntSerializer.
> copyBigInteger(BigIntSerializer.java:141)
>
> at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.copy(BigDecSerializer.java:104)
>
> at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
> RowSerializer.java:212)
>
> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.
> writeToOutput(NormalizedKeySorter.java:499)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> SpillingThread.go(UnilateralSortMerger.java:1344)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> 2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.
> operators.BatchTask - Releasing all broadcast
> variables.: CHAIN GroupReduce (GroupReduce at readCsvRows(
> SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)
>
> 2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Spilling buffer 0.
>
> 2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter
> - Spilling sort buffer without large record handling.
>
> 2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Closing of sort/merger was interrupted. The reading/sorting/spilling
> threads may still be working.
>
> java.lang.InterruptedException
>
> at java.lang.Object.wait(Native Method)
>
> at java.lang.Thread.join(Thread.java:1249)
>
> at java.lang.Thread.join(Thread.java:1323)
>
> at org.apache.flink.runtime.operators.sort.
> UnilateralSortMerger.close(UnilateralSortMerger.java:480)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.
> handleException(UnilateralSortMerger.java:367)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.
> handleException(UnilateralSortMerger.java:362)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.internalHandleException(UnilateralSortMerger.java:842)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> 2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.
> operators.BatchTask - Error in task code: CHAIN
> GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140))
> -> Map (Key Extractor) (1/16)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) ->
> Map (Key Extractor)' , caused an error: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:465)
>
> at org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:355)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.
> getIterator(UnilateralSortMerger.java:619)
>
> at org.apache.flink.runtime.operators.BatchTask.getInput(
> BatchTask.java:1094)
>
> at org.apache.flink.runtime.operators.GroupReduceDriver.
> prepare(GroupReduceDriver.java:99)
>
> at org.apache.flink.runtime.operators.BatchTask.run(
> BatchTask.java:460)
>
> ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: null
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:799)
>
> Caused by: java.io.EOFException
>
> at org.apache.flink.runtime.util.DataInputDeserializer.readInt(
> DataInputDeserializer.java:179)
>
> at org.apache.flink.api.common.typeutils.base.BigDecSerializer.
> readBigDecimal(BigDecSerializer.java:125)
>
> at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.deserialize(BigDecSerializer.java:99)
>
> at org.apache.flink.api.common.typeutils.base.
> BigDecSerializer.deserialize(BigDecSerializer.java:31)
>
> at org.apache.flink.api.java.typeutils.runtime.
> RowSerializer.deserialize(RowSerializer.java:193)
>
> at org.apache.flink.api.java.typeutils.runtime.
> RowSerializer.deserialize(RowSerializer.java:36)
>
> at org.apache.flink.runtime.plugable.
> ReusingDeserializationDelegate.read(ReusingDeserializationDelegate
> .java:57)
>
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:144)
>
> at org.apache.flink.runtime.io.network.api.reader.
> AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
>
> at org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:42)
>
> at org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:59)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ReadingThread.go(UnilateralSortMerger.java:1035)
>
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$
> ThreadBase.run(UnilateralSortMerger.java:796)
>
> 2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger
> - Emitting final buffer from reader thread: 1.
>
>
>