You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "LINZ, Arnaud" <AL...@bouyguestelecom.fr> on 2016/01/30 10:43:36 UTC

Left join with unbalanced dataset

Hello,

I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.

I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.

I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM

Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?

Best regards,

Arnaud


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Left join with unbalanced dataset

Posted by Fabian Hueske <fh...@gmail.com>.
Glad to hear that!
We will release Flink 0.10.2( based on the release-0.10 branch) soon.

Best, Fabian

2016-02-03 14:49 GMT+01:00 LINZ, Arnaud <AL...@bouyguestelecom.fr>:

> Hi,
>
> Yes, I’m always a bit reluctant before installing a snapshot version « for
> everyone », and I was hoping it would suffice…
>
> However, I’ve just recompiled everything and ran with a real 0.10.1
> snapshot and everything worked at an astounding speed with a reasonable
> memory amount.
>
> Thanks for the great work and the help, as always,
>
> Arnaud
>
>
>
> *De :* Fabian Hueske [mailto:fhueske@gmail.com]
> *Envoyé :* mercredi 3 février 2016 10:51
>
> *À :* user@flink.apache.org
> *Objet :* Re: Left join with unbalanced dataset
>
>
>
> Hi Arnauld,
>
> in a previous mail you said:
> "Note that I did not rebuild & reinstall flink, I just used a
> 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0"
> flink installation"
>
> This will not fix the Netty version error. You need to install a new Flink
> version or submit the Flink job, with a new Flink version to YARN to make
> sure that the correct Netty version is used.
>
> Best, Fabian
>
>
>
> 2016-02-03 10:44 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
> Hi!
>
>
>
> I think the closed channel is actually an effect of the process kill.
> Before the exception, you can see "15:22:47,592 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM"
> in the log, which means that UNIX is killing the process.
>
> I assume that the first thing that happens is that UNIX closes the open
> file handles, while the JVM shutdown hooks are still in progress. Hence the
> exception.
>
>
>
> So, the root cause is still the YARN memory killer.
>
>
>
> The log comes from release version 0.10.0.
>
> The Netty fix came into Flink after version 0.10.1 - so it is currently
> only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).
>
>
>
> Greetings,
>
> Stephan
>
>
>
>
>
> On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
> Hi,
>
>
>
> I see nothing wrong in the log of the killed container (it’s in fact
> strange that it fails with I/O channel closure before it is killed by
> yarn), but I’ll post new logs with memory debug as a web download within
> the day.
>
>
>
> In the mean time, log extract :
>
>
>
> Container: container_e11_1453202008841_2868_01_000018 on
> h1r1dn06.bpa.bouyguestelecom.fr_45454
>
>
> ================================================================================================
>
>
>
> …
>
> 15:04:01,234 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --------------------------------------------------------------------------------
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting
> YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14
> UTC)
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current
> user: datcrypt
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum
> heap size: 6900 MiBytes
>
> 15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner
>       -  JAVA_HOME: /usr/java/default
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop
> version: 2.6.0
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM
> Options:
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Xms7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Xmx7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -XX:MaxDirectMemorySize=7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlogback.configurationFile=file:logback.xml
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlog4j.configuration=file:log4j.properties
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program
> Arguments:
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --configDir
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --streamingMode
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
>
> 15:04:01,238 INFO
>  org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --------------------------------------------------------------------------------
>
> …
>
> 15:04:02,215 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting
> TaskManager actor
>
> 15:04:02,224 INFO
> org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig
> [server address: bt1shlhr/172.21.125.16, server port: 47002, memory
> segment size (bytes): 32768, transport type: NIO, number of server threads:
> 0 (use Netty's default), number of client threads: 0 (use Netty's default),
> server connect backlog: 0 (use Netty's default), client connect timeout
> (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
>
> 15:04:02,226 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Messages
> between TaskManager and JobManager have a max timeout of 100000 milliseconds
>
> …
>
>
>
> 15:04:02,970 INFO
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
> 1024 MB for network buffer pool (number of memory segments: 32768, bytes
> per segment: 32768).
>
> 15:04:03,527 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7
> of the currently free heap space for Flink managed heap memory (4099 MB).
>
> 15:04:06,250 INFO
> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
> uses directory
> /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8
> for spill files.
>
> …
>
> 15:04:06,429 INFO
> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
> data connection information: h1r1dn06.bpa.bouyguestelecom.fr
> (dataPort=47002)
>
> 15:04:06,430 INFO
> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
> has 2 task slot(s).
>
> 15:04:06,431 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Memory
> usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB
> (used/committed/max)]
>
> 15:04:06,438 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Trying to
> register at JobManager akka.tcp://
> flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500
> milliseconds)
>
> 15:04:06,591 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Successful
> registration at JobManager (akka.tcp://
> flink@172.21.125.31:36518/user/jobmanager), starting network stack and
> library cache.
>
> …
>
>
>
> 15:17:22,191 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FINISHED to JobManager
> for task DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
> (c9dc588ceb209d98fd08b5144a59adfc)
>
> 15:17:22,196 INFO
> org.apache.flink.runtime.taskmanager.Task                     - DataSink
> (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
> switched to FINISHED
>
> 15:17:22,197 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>
> 15:17:22,197 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FINISHED to JobManager
> for task DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
> (0c1c027e2ca5111e3e54c98b6d7265d7)
>
> 15:22:47,592 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED
> SIGNAL 15: SIGTERM
>
> 15:22:47,608 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,608 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,617 INFO
> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to
> FAILED with exception.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,619 INFO
> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to
> FAILED with exception.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,627 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>
> 15:22:47,627 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>
> 15:22:47,664 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FAILED to JobManager
> for task CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
>
> 15:22:47,738 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FAILED to JobManager
> for task CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
>
> 15:22:47,841 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1)
> (88/95)
>
> com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel
> already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
>
>         at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
>
>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>
>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>
>         at
> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>
>         at
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
>
>         at
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
>
>         at
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
>
>         at
> org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
>
>         at
> org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
>
>         at
> com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
>
>         at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
>
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
>
>         ... 25 more
>
>
>
>
>
> (...)
>
> ______________________
>
> 15:22:51,798 INFO
> org.apache.flink.yarn.YarnJobManager                          - Container
> container_e11_1453202008841_2868_01_000018 is completed with diagnostics:
> Container
> [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is
> running beyond physical memory limits. Current usage: 12.1 GB of 12 GB
> physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing
> container.
>
> Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
>
>         |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
>         |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c
> /usr/java/default/bin/java -Xms7200m -Xmx7200m
> -XX:MaxDirectMemorySize=7200m
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out
> 2>
> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err
> --streamingMode batch
>
>         |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462
> /usr/java/default/bin/java -Xms7200m -Xmx7200m
> -XX:MaxDirectMemorySize=7200m
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode
> batch
>
>
>
> Container killed on request. Exit code is 143
>
> Container exited with a non-zero exit code 143
>
>
>
>
>
>
>
>
>
> *De :* ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] *De la part
> de* Stephan Ewen
> *Envoyé :* mardi 2 février 2016 20:20
> *À :* user@flink.apache.org
> *Objet :* Re: Left join with unbalanced dataset
>
>
>
> To make sure this discussion does not go in a wrong direction:
>
>
>
> There is no issue here with data size, or memory management. The
> MemoryManagement for sorting and hashing works, and Flink handles the
> spilling correctly, etc.
>
>
>
> The issue here is different
>
>    - One possible reason is that the network stack (specifically the Netty
> library) allocates too much direct (= off heap) memory for buffering the
> TCP connections.
>
>    - Another reason could be leaky behavior in Hadoop's HDFS code.
>
>
>
>
>
> @Arnaud: We need the full log of the TaskManager that initially
> experiences that failure, then we can look into this. Best would be with
> activated memory logging, like suggested by Ufuk.
>
>
>
> Best,
>
> Stephan
>
>
>
>
> ------------------------------
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>
>
>
>

RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,
Yes, I’m always a bit reluctant before installing a snapshot version « for everyone », and I was hoping it would suffice…
However, I’ve just recompiled everything and ran with a real 0.10.1 snapshot and everything worked at an astounding speed with a reasonable memory amount.
Thanks for the great work and the help, as always,
Arnaud

De : Fabian Hueske [mailto:fhueske@gmail.com]
Envoyé : mercredi 3 février 2016 10:51
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

Hi Arnauld,
in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation"
This will not fix the Netty version error. You need to install a new Flink version or submit the Flink job, with a new Flink version to YARN to make sure that the correct Netty version is used.
Best, Fabian

2016-02-03 10:44 GMT+01:00 Stephan Ewen <se...@apache.org>>:
Hi!

I think the closed channel is actually an effect of the process kill. Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which means that UNIX is killing the process.
I assume that the first thing that happens is that UNIX closes the open file handles, while the JVM shutdown hooks are still in progress. Hence the exception.

So, the root cause is still the YARN memory killer.

The log comes from release version 0.10.0.
The Netty fix came into Flink after version 0.10.1 - so it is currently only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

Greetings,
Stephan


On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hi,

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

In the mean time, log extract :

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454
================================================================================================

…
15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------
…
15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor
15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16<http://172.21.125.16>, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds
…

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).
15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).
15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.
…
15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr<http://h1r1dn06.bpa.bouyguestelecom.fr> (dataPort=47002)
15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).
15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]
15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager<http://flink@172.21.125.31:36518/user/jobmanager> (attempt 1, timeout: 500 milliseconds)
15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager<http://flink@172.21.125.31:36518/user/jobmanager>), starting network stack and library cache.
…

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)
15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED
15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)
15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM
15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1<ma...@29864ea1>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c<ma...@4e81c03c>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)
15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)
15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)
com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34<ma...@26707c34>
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34<ma...@26707c34>
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
        ... 25 more


(...)
______________________
15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.
Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch
        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143




De : ewenstephan@gmail.com<ma...@gmail.com> [mailto:ewenstephan@gmail.com<ma...@gmail.com>] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : user@flink.apache.org<ma...@flink.apache.org>
Objet : Re: Left join with unbalanced dataset

To make sure this discussion does not go in a wrong direction:

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

The issue here is different
   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.
   - Another reason could be leaky behavior in Hadoop's HDFS code.


@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

Best,
Stephan


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.



Re: Left join with unbalanced dataset

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Arnauld,

in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT
compiled jar submitted as a batch job using the "0.10.0" flink installation"

This will not fix the Netty version error. You need to install a new Flink
version or submit the Flink job, with a new Flink version to YARN to make
sure that the correct Netty version is used.

Best, Fabian

2016-02-03 10:44 GMT+01:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> I think the closed channel is actually an effect of the process kill.
> Before the exception, you can see "15:22:47,592 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM"
> in the log, which means that UNIX is killing the process.
> I assume that the first thing that happens is that UNIX closes the open
> file handles, while the JVM shutdown hooks are still in progress. Hence the
> exception.
>
> So, the root cause is still the YARN memory killer.
>
> The log comes from release version 0.10.0.
> The Netty fix came into Flink after version 0.10.1 - so it is currently
> only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).
>
> Greetings,
> Stephan
>
>
> On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
>
>> Hi,
>>
>>
>>
>> I see nothing wrong in the log of the killed container (it’s in fact
>> strange that it fails with I/O channel closure before it is killed by
>> yarn), but I’ll post new logs with memory debug as a web download within
>> the day.
>>
>>
>>
>> In the mean time, log extract :
>>
>>
>>
>> Container: container_e11_1453202008841_2868_01_000018 on
>> h1r1dn06.bpa.bouyguestelecom.fr_45454
>>
>>
>> ================================================================================================
>>
>>
>>
>> …
>>
>> 15:04:01,234 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --------------------------------------------------------------------------------
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting
>> YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14
>> UTC)
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current
>> user: datcrypt
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java
>> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum
>> heap size: 6900 MiBytes
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME:
>> /usr/java/default
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop
>> version: 2.6.0
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM
>> Options:
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Xms7200m
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Xmx7200m
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -XX:MaxDirectMemorySize=7200m
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Dlogback.configurationFile=file:logback.xml
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Dlog4j.configuration=file:log4j.properties
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program
>> Arguments:
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --configDir
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --streamingMode
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
>>
>> 15:04:01,238 INFO
>>  org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --------------------------------------------------------------------------------
>>
>> …
>>
>> 15:04:02,215 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Starting
>> TaskManager actor
>>
>> 15:04:02,224 INFO
>> org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig
>> [server address: bt1shlhr/172.21.125.16, server port: 47002, memory
>> segment size (bytes): 32768, transport type: NIO, number of server threads:
>> 0 (use Netty's default), number of client threads: 0 (use Netty's default),
>> server connect backlog: 0 (use Netty's default), client connect timeout
>> (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
>>
>> 15:04:02,226 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Messages
>> between TaskManager and JobManager have a max timeout of 100000 milliseconds
>>
>> …
>>
>>
>>
>> 15:04:02,970 INFO
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
>> 1024 MB for network buffer pool (number of memory segments: 32768, bytes
>> per segment: 32768).
>>
>> 15:04:03,527 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7
>> of the currently free heap space for Flink managed heap memory (4099 MB).
>>
>> 15:04:06,250 INFO
>> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
>> uses directory
>> /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8
>> for spill files.
>>
>> …
>>
>> 15:04:06,429 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
>> data connection information: h1r1dn06.bpa.bouyguestelecom.fr
>> (dataPort=47002)
>>
>> 15:04:06,430 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
>> has 2 task slot(s).
>>
>> 15:04:06,431 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - Memory
>> usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB
>> (used/committed/max)]
>>
>> 15:04:06,438 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - Trying to
>> register at JobManager akka.tcp://
>> flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500
>> milliseconds)
>>
>> 15:04:06,591 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - Successful
>> registration at JobManager (akka.tcp://
>> flink@172.21.125.31:36518/user/jobmanager), starting network stack and
>> library cache.
>>
>> …
>>
>>
>>
>> 15:17:22,191 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FINISHED to JobManager
>> for task DataSink (Hive Output to
>> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
>> (c9dc588ceb209d98fd08b5144a59adfc)
>>
>> 15:17:22,196 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - DataSink
>> (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>> switched to FINISHED
>>
>> 15:17:22,197 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Freeing
>> task resources for DataSink (Hive Output to
>> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>>
>> 15:17:22,197 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FINISHED to JobManager
>> for task DataSink (Hive Output to
>> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
>> (0c1c027e2ca5111e3e54c98b6d7265d7)
>>
>> 15:22:47,592 ERROR
>> org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED
>> SIGNAL 15: SIGTERM
>>
>> 15:22:47,608 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,608 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,617 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
>> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
>> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to
>> FAILED with exception.
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,619 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
>> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
>> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to
>> FAILED with exception.
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,627 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Freeing
>> task resources for CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>>
>> 15:22:47,627 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Freeing
>> task resources for CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>>
>> 15:22:47,664 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FAILED to JobManager
>> for task CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
>>
>> 15:22:47,738 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FAILED to JobManager
>> for task CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
>>
>> 15:22:47,841 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN GroupReduce (GroupReduce at
>> calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1)
>> (88/95)
>>
>> com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel
>> already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>>
>>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
>>
>>         at
>> com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
>>
>>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>
>>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>
>>         at
>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
>>
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>
>>         at
>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
>>
>>         at
>> org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
>>
>>         at
>> org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
>>
>>         at
>> com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
>>
>>         at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
>>
>>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
>>
>>         ... 25 more
>>
>>
>>
>>
>>
>> (...)
>>
>> ______________________
>>
>> 15:22:51,798 INFO
>> org.apache.flink.yarn.YarnJobManager                          - Container
>> container_e11_1453202008841_2868_01_000018 is completed with diagnostics:
>> Container
>> [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is
>> running beyond physical memory limits. Current usage: 12.1 GB of 12 GB
>> physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing
>> container.
>>
>> Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
>>
>>         |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>>
>>         |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c
>> /usr/java/default/bin/java -Xms7200m -Xmx7200m
>> -XX:MaxDirectMemorySize=7200m
>> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
>> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out
>> 2>
>> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err
>> --streamingMode batch
>>
>>         |- 14558 14548 14548 14548 (java) 631070 15142 13881634816
>> 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m
>> -XX:MaxDirectMemorySize=7200m
>> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode
>> batch
>>
>>
>>
>> Container killed on request. Exit code is 143
>>
>> Container exited with a non-zero exit code 143
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *De :* ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] *De la part
>> de* Stephan Ewen
>> *Envoyé :* mardi 2 février 2016 20:20
>> *À :* user@flink.apache.org
>> *Objet :* Re: Left join with unbalanced dataset
>>
>>
>>
>> To make sure this discussion does not go in a wrong direction:
>>
>>
>>
>> There is no issue here with data size, or memory management. The
>> MemoryManagement for sorting and hashing works, and Flink handles the
>> spilling correctly, etc.
>>
>>
>>
>> The issue here is different
>>
>>    - One possible reason is that the network stack (specifically the
>> Netty library) allocates too much direct (= off heap) memory for buffering
>> the TCP connections.
>>
>>    - Another reason could be leaky behavior in Hadoop's HDFS code.
>>
>>
>>
>>
>>
>> @Arnaud: We need the full log of the TaskManager that initially
>> experiences that failure, then we can look into this. Best would be with
>> activated memory logging, like suggested by Ufuk.
>>
>>
>>
>> Best,
>>
>> Stephan
>>
>>
>>
>> ------------------------------
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>

Re: Left join with unbalanced dataset

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I think the closed channel is actually an effect of the process kill.
Before the exception, you can see "15:22:47,592 ERROR org.apache.flink.yarn.
YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM" in the log, which
means that UNIX is killing the process.
I assume that the first thing that happens is that UNIX closes the open
file handles, while the JVM shutdown hooks are still in progress. Hence the
exception.

So, the root cause is still the YARN memory killer.

The log comes from release version 0.10.0.
The Netty fix came into Flink after version 0.10.1 - so it is currently
only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).

Greetings,
Stephan


On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hi,
>
>
>
> I see nothing wrong in the log of the killed container (it’s in fact
> strange that it fails with I/O channel closure before it is killed by
> yarn), but I’ll post new logs with memory debug as a web download within
> the day.
>
>
>
> In the mean time, log extract :
>
>
>
> Container: container_e11_1453202008841_2868_01_000018 on
> h1r1dn06.bpa.bouyguestelecom.fr_45454
>
>
> ================================================================================================
>
>
>
> …
>
> 15:04:01,234 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --------------------------------------------------------------------------------
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting
> YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14
> UTC)
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current
> user: datcrypt
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java
> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
>
> 15:04:01,236 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum
> heap size: 6900 MiBytes
>
> 15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner
>       -  JAVA_HOME: /usr/java/default
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop
> version: 2.6.0
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM
> Options:
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Xms7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Xmx7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -XX:MaxDirectMemorySize=7200m
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlogback.configurationFile=file:logback.xml
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> -Dlog4j.configuration=file:log4j.properties
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program
> Arguments:
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --configDir
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --streamingMode
>
> 15:04:01,238 INFO
> org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
>
> 15:04:01,238 INFO
>  org.apache.flink.yarn.YarnTaskManagerRunner                   -
> --------------------------------------------------------------------------------
>
> …
>
> 15:04:02,215 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Starting
> TaskManager actor
>
> 15:04:02,224 INFO
> org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig
> [server address: bt1shlhr/172.21.125.16, server port: 47002, memory
> segment size (bytes): 32768, transport type: NIO, number of server threads:
> 0 (use Netty's default), number of client threads: 0 (use Netty's default),
> server connect backlog: 0 (use Netty's default), client connect timeout
> (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
>
> 15:04:02,226 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Messages
> between TaskManager and JobManager have a max timeout of 100000 milliseconds
>
> …
>
>
>
> 15:04:02,970 INFO
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
> 1024 MB for network buffer pool (number of memory segments: 32768, bytes
> per segment: 32768).
>
> 15:04:03,527 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7
> of the currently free heap space for Flink managed heap memory (4099 MB).
>
> 15:04:06,250 INFO
> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
> uses directory
> /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8
> for spill files.
>
> …
>
> 15:04:06,429 INFO
> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
> data connection information: h1r1dn06.bpa.bouyguestelecom.fr
> (dataPort=47002)
>
> 15:04:06,430 INFO
> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
> has 2 task slot(s).
>
> 15:04:06,431 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Memory
> usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB
> (used/committed/max)]
>
> 15:04:06,438 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Trying to
> register at JobManager akka.tcp://
> flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500
> milliseconds)
>
> 15:04:06,591 INFO
> org.apache.flink.yarn.YarnTaskManager                         - Successful
> registration at JobManager (akka.tcp://
> flink@172.21.125.31:36518/user/jobmanager), starting network stack and
> library cache.
>
> …
>
>
>
> 15:17:22,191 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FINISHED to JobManager
> for task DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
> (c9dc588ceb209d98fd08b5144a59adfc)
>
> 15:17:22,196 INFO
> org.apache.flink.runtime.taskmanager.Task                     - DataSink
> (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
> switched to FINISHED
>
> 15:17:22,197 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>
> 15:17:22,197 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FINISHED to JobManager
> for task DataSink (Hive Output to
> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
> (0c1c027e2ca5111e3e54c98b6d7265d7)
>
> 15:22:47,592 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED
> SIGNAL 15: SIGTERM
>
> 15:22:47,608 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,608 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,617 INFO
> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to
> FAILED with exception.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,619 INFO
> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to
> FAILED with exception.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>         ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>         at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>         at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>         ... 8 more
>
> 15:22:47,627 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>
> 15:22:47,627 INFO
> org.apache.flink.runtime.taskmanager.Task                     - Freeing
> task resources for CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>
> 15:22:47,664 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FAILED to JobManager
> for task CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
>
> 15:22:47,738 INFO
> org.apache.flink.yarn.YarnTaskManager                         -
> Unregistering task and sending final execution state FAILED to JobManager
> for task CHAIN GroupReduce (GroupReduce at
> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
>
> 15:22:47,841 ERROR
> org.apache.flink.runtime.operators.BatchTask                  - Error in
> task code:  CHAIN GroupReduce (GroupReduce at
> calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1)
> (88/95)
>
> com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel
> already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
>
>         at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
>
>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>
>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>
>         at
> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
>
>         at
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>
>         at
> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
>
>         at
> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
>
>         at
> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
>
>         at
> org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
>
>         at
> org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
>
>         at
> com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
>
>         at
> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
>
>         at
> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>
>         at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>         at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
>
>         at
> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
>
>         at
> org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
>
>         at
> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
>
>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
>
>         ... 25 more
>
>
>
>
>
> (...)
>
> ______________________
>
> 15:22:51,798 INFO
> org.apache.flink.yarn.YarnJobManager                          - Container
> container_e11_1453202008841_2868_01_000018 is completed with diagnostics:
> Container
> [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is
> running beyond physical memory limits. Current usage: 12.1 GB of 12 GB
> physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing
> container.
>
> Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
>
>         |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
>         |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c
> /usr/java/default/bin/java -Xms7200m -Xmx7200m
> -XX:MaxDirectMemorySize=7200m
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out
> 2>
> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err
> --streamingMode batch
>
>         |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462
> /usr/java/default/bin/java -Xms7200m -Xmx7200m
> -XX:MaxDirectMemorySize=7200m
> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
> -Dlogback.configurationFile=file:logback.xml
> -Dlog4j.configuration=file:log4j.properties
> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode
> batch
>
>
>
> Container killed on request. Exit code is 143
>
> Container exited with a non-zero exit code 143
>
>
>
>
>
>
>
>
>
> *De :* ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] *De la part
> de* Stephan Ewen
> *Envoyé :* mardi 2 février 2016 20:20
> *À :* user@flink.apache.org
> *Objet :* Re: Left join with unbalanced dataset
>
>
>
> To make sure this discussion does not go in a wrong direction:
>
>
>
> There is no issue here with data size, or memory management. The
> MemoryManagement for sorting and hashing works, and Flink handles the
> spilling correctly, etc.
>
>
>
> The issue here is different
>
>    - One possible reason is that the network stack (specifically the Netty
> library) allocates too much direct (= off heap) memory for buffering the
> TCP connections.
>
>    - Another reason could be leaky behavior in Hadoop's HDFS code.
>
>
>
>
>
> @Arnaud: We need the full log of the TaskManager that initially
> experiences that failure, then we can look into this. Best would be with
> activated memory logging, like suggested by Ufuk.
>
>
>
> Best,
>
> Stephan
>
>
>
> ------------------------------
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

I see nothing wrong in the log of the killed container (it’s in fact strange that it fails with I/O channel closure before it is killed by yarn), but I’ll post new logs with memory debug as a web download within the day.

In the mean time, log extract :

Container: container_e11_1453202008841_2868_01_000018 on h1r1dn06.bpa.bouyguestelecom.fr_45454
================================================================================================

…
15:04:01,234 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14 UTC)
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current user: datcrypt
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum heap size: 6900 MiBytes
15:04:01,236 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME: /usr/java/default
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop version: 2.6.0
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM Options:
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xms7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Xmx7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -XX:MaxDirectMemorySize=7200m
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlogback.configurationFile=file:logback.xml
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     -Dlog4j.configuration=file:log4j.properties
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program Arguments:
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --configDir
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     --streamingMode
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
15:04:01,238 INFO  org.apache.flink.yarn.YarnTaskManagerRunner                   - --------------------------------------------------------------------------------
…
15:04:02,215 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Starting TaskManager actor
15:04:02,224 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig [server address: bt1shlhr/172.21.125.16, server port: 47002, memory segment size (bytes): 32768, transport type: NIO, number of server threads: 0 (use Netty's default), number of client threads: 0 (use Netty's default), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
15:04:02,226 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Messages between TaskManager and JobManager have a max timeout of 100000 milliseconds
…

15:04:02,970 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 1024 MB for network buffer pool (number of memory segments: 32768, bytes per segment: 32768).
15:04:03,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7 of the currently free heap space for Flink managed heap memory (4099 MB).
15:04:06,250 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager uses directory /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8 for spill files.
…
15:04:06,429 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager data connection information: h1r1dn06.bpa.bouyguestelecom.fr (dataPort=47002)
15:04:06,430 INFO  org.apache.flink.yarn.YarnTaskManager                         - TaskManager has 2 task slot(s).
15:04:06,431 INFO  org.apache.flink.yarn.YarnTaskManager                         - Memory usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB (used/committed/max)]
15:04:06,438 INFO  org.apache.flink.yarn.YarnTaskManager                         - Trying to register at JobManager akka.tcp://flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500 milliseconds)
15:04:06,591 INFO  org.apache.flink.yarn.YarnTaskManager                         - Successful registration at JobManager (akka.tcp://flink@172.21.125.31:36518/user/jobmanager), starting network stack and library cache.
…

15:17:22,191 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (c9dc588ceb209d98fd08b5144a59adfc)
15:17:22,196 INFO  org.apache.flink.runtime.taskmanager.Task                     - DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95) switched to FINISHED
15:17:22,197 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
15:17:22,197 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FINISHED to JobManager for task DataSink (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (0c1c027e2ca5111e3e54c98b6d7265d7)
15:22:47,592 ERROR org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED SIGNAL 15: SIGTERM
15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,608 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,617 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to FAILED with exception.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,619 INFO  org.apache.flink.runtime.taskmanager.Task                     - CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to FAILED with exception.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
        at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
        ... 8 more
15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95)
15:22:47,627 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95)
15:22:47,664 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
15:22:47,738 INFO  org.apache.flink.yarn.YarnTaskManager                         - Unregistering task and sending final execution state FAILED to JobManager for task CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
15:22:47,841 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1) (88/95)
com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
        at com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
        at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
        at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
        at com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
        at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
        at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
        at org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
        at org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
        at com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
        at org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
        at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
        at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
        at org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
        at org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
        at org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
        ... 25 more


(...)
______________________
15:22:51,798 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2868_01_000018 is completed with diagnostics: Container [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is running beyond physical memory limits. Current usage: 12.1 GB of 12 GB physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing container.
Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
        |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
        |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m  -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out 2> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err --streamingMode batch
        |- 14558 14548 14548 14548 (java) 631070 15142 13881634816 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m -XX:MaxDirectMemorySize=7200m -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143




De : ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 20:20
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

To make sure this discussion does not go in a wrong direction:

There is no issue here with data size, or memory management. The MemoryManagement for sorting and hashing works, and Flink handles the spilling correctly, etc.

The issue here is different
   - One possible reason is that the network stack (specifically the Netty library) allocates too much direct (= off heap) memory for buffering the TCP connections.
   - Another reason could be leaky behavior in Hadoop's HDFS code.


@Arnaud: We need the full log of the TaskManager that initially experiences that failure, then we can look into this. Best would be with activated memory logging, like suggested by Ufuk.

Best,
Stephan


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Left join with unbalanced dataset

Posted by Stephan Ewen <se...@apache.org>.
To make sure this discussion does not go in a wrong direction:

There is no issue here with data size, or memory management. The
MemoryManagement for sorting and hashing works, and Flink handles the
spilling correctly, etc.

The issue here is different
   - One possible reason is that the network stack (specifically the Netty
library) allocates too much direct (= off heap) memory for buffering the
TCP connections.
   - Another reason could be leaky behavior in Hadoop's HDFS code.


@Arnaud: We need the full log of the TaskManager that initially experiences
that failure, then we can look into this. Best would be with activated
memory logging, like suggested by Ufuk.

Best,
Stephan


On Tue, Feb 2, 2016 at 6:21 PM, Gábor Gévay <gg...@gmail.com> wrote:

> Hello Arnaud,
>
> > Flink does not start the reduce operation until all lines have
> > been created (memory bottleneck is during the collection
> > of all lines) ; but theorically it is possible.
>
> The problem that `S.groupBy(...).reduce(...)` needs to fully
> materialize S comes from the fact that the implementation of reduce is
> currently sort based. But this PR will partially solve this:
> https://github.com/apache/flink/pull/1517
> It implements a hash-based combiner, which will not materialize the
> input, but instead needs memory proportional to only the number of
> different keys occurring. You might want to try rebasing to this PR,
> to see whether it improves your situation.
>
> (I also plan to extend this implementation to the actual reduce after
> the combine, but I'm not sure when will I get around to that.)
>
> Best,
> Gábor
>
>
>
> 2016-02-02 16:56 GMT+01:00 LINZ, Arnaud <AL...@bouyguestelecom.fr>:
> > Thanks,
> >
> > Giving the batch an outrageous amount of memory with a 0.5 heap ratio
> leads to the success of the batch.
> >
> > I've figured out which dataset is consuming the most memory, I have a
> big join that demultiplies the size of the input set before a group reduce.
> > I am willing to optimize my code by reducing the join output size upon
> junction.
> >
> > The outline of the treatment is :
> > DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
> > DataSet B = (K1, V2)  where there are multiple values V2 for the same K1
> (say 5)
> >
> > I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
> > As B contains 5 lines for one key of A, A.join(B) is 5 times the size of
> A.
> >
> > Flink does not start the reduce operation until all lines have been
> created (memory bottleneck is during the collection of all lines) ; but
> theorically it is possible.
> > I see no "join group" operator that could do something like
> "A.groupBy(K1,K2).join(B).on(K1).reduce()"
> >
> > Is there a way to do this ?
> >
> > The other way I see is to load B in memory for all nodes and use a hash
> map upon reduction to get all A.join(B) lines. B is not that small, but I
> think it will still save RAM.
> >
> > Best regards,
> > Arnaud
> >
> > -----Message d'origine-----
> > De : Ufuk Celebi [mailto:uce@apache.org]
> > Envoyé : mardi 2 février 2016 15:27
> > À : user@flink.apache.org
> > Objet : Re: Left join with unbalanced dataset
> >
> >
> >> On 02 Feb 2016, at 15:15, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
> >>
> >> Hi,
> >>
> >> Running again with more RAM made the treatement go further, but Yarn
> still killed one container for memory consumption. I will experiment
> various memory parameters.
> >
> > OK, the killing of the container probably triggered the
> RemoteTransportException.
> >
> > Can you tell me how many containers you are using, how much phyiscal
> memory the machines have and how much the containers get?
> >
> > You can monitor memory usage by setting
> >
> > taskmanager.debug.memory.startLogThread: true
> >
> > in the config. This will periodically log the memory consumption to the
> task manager logs. Can you try this and check the logs for the memory
> consumption?
> >
> > You can also have a look at it in the web frontend under the Task
> Manager tab.
> >
> > – Ufuk
> >
> >
> > ________________________________
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

Re: Left join with unbalanced dataset

Posted by Gábor Gévay <gg...@gmail.com>.
Hello Arnaud,

> Flink does not start the reduce operation until all lines have
> been created (memory bottleneck is during the collection
> of all lines) ; but theorically it is possible.

The problem that `S.groupBy(...).reduce(...)` needs to fully
materialize S comes from the fact that the implementation of reduce is
currently sort based. But this PR will partially solve this:
https://github.com/apache/flink/pull/1517
It implements a hash-based combiner, which will not materialize the
input, but instead needs memory proportional to only the number of
different keys occurring. You might want to try rebasing to this PR,
to see whether it improves your situation.

(I also plan to extend this implementation to the actual reduce after
the combine, but I'm not sure when will I get around to that.)

Best,
Gábor



2016-02-02 16:56 GMT+01:00 LINZ, Arnaud <AL...@bouyguestelecom.fr>:
> Thanks,
>
> Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch.
>
> I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce.
> I am willing to optimize my code by reducing the join output size upon junction.
>
> The outline of the treatment is :
> DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
> DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 5)
>
> I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
> As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.
>
> Flink does not start the reduce operation until all lines have been created (memory bottleneck is during the collection of all lines) ; but theorically it is possible.
> I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()"
>
> Is there a way to do this ?
>
> The other way I see is to load B in memory for all nodes and use a hash map upon reduction to get all A.join(B) lines. B is not that small, but I think it will still save RAM.
>
> Best regards,
> Arnaud
>
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:uce@apache.org]
> Envoyé : mardi 2 février 2016 15:27
> À : user@flink.apache.org
> Objet : Re: Left join with unbalanced dataset
>
>
>> On 02 Feb 2016, at 15:15, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
>>
>> Hi,
>>
>> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.
>
> OK, the killing of the container probably triggered the RemoteTransportException.
>
> Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?
>
> You can monitor memory usage by setting
>
> taskmanager.debug.memory.startLogThread: true
>
> in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?
>
> You can also have a look at it in the web frontend under the Task Manager tab.
>
> – Ufuk
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Thanks,

Giving the batch an outrageous amount of memory with a 0.5 heap ratio leads to the success of the batch.

I've figured out which dataset is consuming the most memory, I have a big join that demultiplies the size of the input set before a group reduce.
I am willing to optimize my code by reducing the join output size upon junction.

The outline of the treatment is :
DataSet A = (K1, K2, V1) where (K1,K2) is the key. A is huge.
DataSet B = (K1, V2)  where there are multiple values V2 for the same K1 (say 5)

I do something like : A.join(B).on(K1).groupBy(K1,K2).reduce()
As B contains 5 lines for one key of A, A.join(B) is 5 times the size of A.

Flink does not start the reduce operation until all lines have been created (memory bottleneck is during the collection of all lines) ; but theorically it is possible.
I see no "join group" operator that could do something like "A.groupBy(K1,K2).join(B).on(K1).reduce()"

Is there a way to do this ?

The other way I see is to load B in memory for all nodes and use a hash map upon reduction to get all A.join(B) lines. B is not that small, but I think it will still save RAM.

Best regards,
Arnaud

-----Message d'origine-----
De : Ufuk Celebi [mailto:uce@apache.org]
Envoyé : mardi 2 février 2016 15:27
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset


> On 02 Feb 2016, at 15:15, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
>
> Hi,
>
> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

OK, the killing of the container probably triggered the RemoteTransportException.

Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?

You can monitor memory usage by setting

taskmanager.debug.memory.startLogThread: true

in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?

You can also have a look at it in the web frontend under the Task Manager tab.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Left join with unbalanced dataset

Posted by Ufuk Celebi <uc...@apache.org>.
> On 02 Feb 2016, at 15:15, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
> 
> Hi,
> 
> Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

OK, the killing of the container probably triggered the RemoteTransportException.

Can you tell me how many containers you are using, how much phyiscal memory the machines have and how much the containers get?

You can monitor memory usage by setting 

taskmanager.debug.memory.startLogThread: true

in the config. This will periodically log the memory consumption to the task manager logs. Can you try this and check the logs for the memory consumption?

You can also have a look at it in the web frontend under the Task Manager tab.

– Ufuk


Re: Left join with unbalanced dataset

Posted by Robert Metzger <rm...@apache.org>.
Hi Arnaud,

you can retrieve the logs of a yarn application by calling "yarn logs
-applicationId <id>".

Its going to output you the logs of all Taskmanagers and the job manager in
one stream. I would pipe the output into a file and then search for the
position where the log for the failing taskmanager starts.

On Tue, Feb 2, 2016 at 3:15 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hi,
>
> Running again with more RAM made the treatement go further, but Yarn still
> killed one container for memory consumption. I will experiment various
> memory parameters.
>
> How do I retrieve the log of a specific task manager post-mortem? I don't
> use a permanent Flink/Yarn container (it's killed upon batch completion).
>
>
> -----Message d'origine-----
> De : Ufuk Celebi [mailto:uce@apache.org]
> Envoyé : mardi 2 février 2016 14:41
> This means that the task at task manager bt1shli2/172.21.125.27:49771
> failed during the production of the intermediate data. It’s independent of
> the memory problem.
>
> Could you please check the logs of that task manager? Sorry for the
> inconvenience! I hope that we can resolve this shortly.
>
> – Ufuk
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>

RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

Running again with more RAM made the treatement go further, but Yarn still killed one container for memory consumption. I will experiment various memory parameters.

How do I retrieve the log of a specific task manager post-mortem? I don't use a permanent Flink/Yarn container (it's killed upon batch completion).


-----Message d'origine-----
De : Ufuk Celebi [mailto:uce@apache.org]
Envoyé : mardi 2 février 2016 14:41
This means that the task at task manager bt1shli2/172.21.125.27:49771 failed during the production of the intermediate data. It’s independent of the memory problem.

Could you please check the logs of that task manager? Sorry for the inconvenience! I hope that we can resolve this shortly.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Left join with unbalanced dataset

Posted by Ufuk Celebi <uc...@apache.org>.
> On 02 Feb 2016, at 14:31, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
> 
> Hi,
> 
> Unfortunalety, it still fails, but with a different error (see below).
> Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation.

This means that the task at task manager bt1shli2/172.21.125.27:49771 failed during the production of the intermediate data. It’s independent of the memory problem.

Could you please check the logs of that task manager? Sorry for the inconvenience! I hope that we can resolve this shortly.

– Ufuk


RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

Unfortunalety, it still fails, but with a different error (see below).
Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT compiled jar submitted as a batch job using the "0.10.0" flink installation.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'bt1shli2/172.21.125.27:49771'.
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
        at java.lang.Thread.run(Thread.java:744)

Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
        at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
        at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
        at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
        at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
        at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:270)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)


-----Message d'origine-----
De : Ufuk Celebi [mailto:uce@apache.org]
Envoyé : mardi 2 février 2016 13:52
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset


> On 02 Feb 2016, at 13:28, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
>
> Thanks,
> I’m using the official 0.10 release. I will try to use the 0.10 snapshot.
>
> FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

That’s the error Stephan was referring to. Does the snapshot version fix it for you?

I will prepare a 0.10.2 bug fix release, which includes the fix.

– Ufuk


________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Re: Left join with unbalanced dataset

Posted by Ufuk Celebi <uc...@apache.org>.
> On 02 Feb 2016, at 13:28, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
> 
> Thanks,
> I’m using the official 0.10 release. I will try to use the 0.10 snapshot.
>  
> FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

That’s the error Stephan was referring to. Does the snapshot version fix it for you?

I will prepare a 0.10.2 bug fix release, which includes the fix.

– Ufuk


RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Thanks,
I’m using the official 0.10 release. I will try to use the 0.10 snapshot.

FYI, setting the heap cut-off ratio to 0.5 lead to the following error :

12:20:17,313 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job c55216ab9383fd14e1d287a69a6e0f7e (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at calculeMinArea(TransfoStage2StageOnTaz.java:153)) -> Map (Key Extractor 1)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
         at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
         at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
         at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
         ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: java.lang.OutOfMemoryError: Direct buffer memory
         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: java.lang.OutOfMemoryError: Direct buffer memory
         at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:153)
         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
         at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
         at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
         at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:224)
         at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
         at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:246)
         at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:737)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:310)
         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
         at java.lang.Thread.run(Thread.java:744)
Caused by: io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory
         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:234)
         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
         ... 9 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
         at java.nio.Bits.reserveMemory(Bits.java:658)
         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
         at io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:651)
         at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:237)
         at io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:358)
         at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:111)
         at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251)
         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849)
         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841)
         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831)
         at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:228)


De : ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : mardi 2 février 2016 11:30
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

Hi Arnaud!

Which version of Flink are you using? In 0.10.1, the Netty library version that we use has changed behavior, and allocates a lot of off-heap memory. Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be fixed, also on 0.10-SNAPSHOT.

If that turns out to be the cause, the good news is that we started discussing a 0.10.2 maintenance release that should also have a fix for that.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
Hi,

Changing for a outer join did not change the error ; nor balancing the join with another dataset ; nor dividing parallelism level by 2 ; nor increasing memory by 2.
Heap size & thread number is OK under JvisualVM.  So the problem is elsewhere.

Do Flink uses off-heap memory ? How can I monitor it ?

Thanks,
Arnaud


10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<ma...@2327bac5>
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
          at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<ma...@2327bac5>
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
          at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
          at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
          ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<ma...@2327bac5>
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<ma...@2327bac5>
          at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5<ma...@2327bac5>
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
          at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
          at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
          at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
          at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

(…)

10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@172.21.125.13:40286<http://flink@172.21.125.13:40286>] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.1 GB of 16.8 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :

          |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

          |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out 2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err --streamingMode batch

          |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch



Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143



10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager



De : LINZ, Arnaud
Envoyé : lundi 1 février 2016 09:40
À : user@flink.apache.org<ma...@flink.apache.org>
Objet : RE: Left join with unbalanced dataset

Hi,
Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.
I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.
Arnaud


De : ewenstephan@gmail.com<ma...@gmail.com> [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : dimanche 31 janvier 2016 20:57
À : user@flink.apache.org<ma...@flink.apache.org>
Objet : Re: Left join with unbalanced dataset

Hi!

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

Thanks,
Stephan


On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <tr...@apache.org>> wrote:
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <ch...@apache.org>> wrote:
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.




Re: Left join with unbalanced dataset

Posted by Stephan Ewen <se...@apache.org>.
Hi Arnaud!

Which version of Flink are you using? In 0.10.1, the Netty library version
that we use has changed behavior, and allocates a lot of off-heap memory.
Would be my guess that this is the cause. In 1.0-SNAPSHOT, that should be
fixed, also on 0.10-SNAPSHOT.

If that turns out to be the cause, the good news is that we started
discussing a 0.10.2 maintenance release that should also have a fix for
that.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 11:12 AM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
wrote:

> Hi,
>
>
>
> Changing for a outer join did not change the error ; nor balancing the
> join with another dataset ; nor dividing parallelism level by 2 ; nor
> increasing memory by 2.
>
> Heap size & thread number is OK under JvisualVM.  So the problem is
> elsewhere.
>
>
>
> Do Flink uses off-heap memory ? How can I monitor it ?
>
>
>
> Thanks,
>
> Arnaud
>
>
>
> 10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
> due to an exception: java.io.IOException: I/O channel already closed. Could
> not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>
>           at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>
>           at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>           at java.lang.Thread.run(Thread.java:744)
>
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger spilling thread' terminated due to an exception:
> java.io.IOException: I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>
>           at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>
>           at
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>
>           at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>
>           ... 3 more
>
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
> terminated due to an exception: java.io.IOException: I/O channel already
> closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>
> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
> I/O channel already closed. Could not fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>
>           at
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>
>           at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
> Caused by: java.io.IOException: I/O channel already closed. Could not
> fulfill:
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>
>           at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>
>           at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>
>           at
> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>
>           at
> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>
>           at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>
>
>
> (…)
>
> 10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@172.21.125.13:40286] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>
> 10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.1 GB of 16.8 GB virtual memory used. Killing container.
>
> Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :
>
>           |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>
>           |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out 2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err --streamingMode batch
>
>           |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch
>
>
>
> Container killed on request. Exit code is 143
>
> Container exited with a non-zero exit code 143
>
>
>
> 10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager
>
>
>
>
>
>
>
> *De :* LINZ, Arnaud
> *Envoyé :* lundi 1 février 2016 09:40
> *À :* user@flink.apache.org
> *Objet :* RE: Left join with unbalanced dataset
>
>
>
> Hi,
>
> Thanks, I can’t believe I missed the outer join operators… Will try them
> and will keep you informed.
>
> I use the “official” 0.10 release from the maven repo. The off-heap memory
> I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I
> don’t have many open files at once, and doubling the amount of memory did
> not solve the problem.
>
> Arnaud
>
>
>
>
>
> *De :* ewenstephan@gmail.com [mailto:ewenstephan@gmail.com
> <ew...@gmail.com>] *De la part de* Stephan Ewen
> *Envoyé :* dimanche 31 janvier 2016 20:57
> *À :* user@flink.apache.org
> *Objet :* Re: Left join with unbalanced dataset
>
>
>
> Hi!
>
>
>
> YARN killing the application seems strange. The memory use that YARN sees
> should not change even when one node gets a lot or data.
>
>
>
> Can you share what version of Flink (plus commit hash) you are using and
> whether you use off-heap memory or not?
>
>
>
> Thanks,
>
> Stephan
>
>
>
>
>
> On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> Hi Arnaud,
>
>
>
> the unmatched elements of A will only end up on the same worker node if
> they all share the same key. Otherwise, they will be evenly spread out
> across your cluster. However, I would also recommend you to use Flink's
> leftOuterJoin.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <ch...@apache.org>
> wrote:
>
> Hi Arnaud,
>
> To join two datasets, the community recommends using join operation rather
> than cogroup operation. For left join, you can use leftOuterJoin method.
> Flink’s optimizer decides distributed join execution strategy using some
> statistics of the datasets such as size of the dataset. Additionally, you
> can set join hint to help optimizer decide the strategy.
>
> In transformations section [1] of Flink documentation, you can find about
> outer join operation in detail.
>
> I hope this helps.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations
>
> Regards,
> Chiwan Park
>
> > On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
> >
> > Hello,
> >
> > I have a very big dataset A to left join with a dataset B that is half
> its size. That is to say, half of A records will be matched with one record
> of B, and the other half with null values.
> >
> > I used a CoGroup for that, but my batch fails because yarn kills the
> container due to memory problems.
> >
> > I guess that’s because one worker will get half of A dataset (the
> unmatched ones), and that’s too much for a single JVM
> >
> > Am I right in my diagnostic ? Is there a better way to left join
> unbalanced datasets ?
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
>
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>
>
>
>

RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,

Changing for a outer join did not change the error ; nor balancing the join with another dataset ; nor dividing parallelism level by 2 ; nor increasing memory by 2.
Heap size & thread number is OK under JvisualVM.  So the problem is elsewhere.

Do Flink uses off-heap memory ? How can I monitor it ?

Thanks,
Arnaud


10:58:53,384 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 8b2ea62e16b82ccc2242bb5549d434a5 (KUBERA-GEO-BRUT2SEGMENT) changed to FAILING.
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
          at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
          at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
          at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
          ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
          at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
          at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
          at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
          at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@2327bac5
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
          at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
          at org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
          at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
          at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
          at org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
          at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)

(…)

10:58:54,423 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink@172.21.125.13:40286] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

10:58:54,470 INFO  org.apache.flink.yarn.YarnJobManager                          - Container container_e11_1453202008841_2794_01_000025 is completed with diagnostics: Container [pid=14331,containerID=container_e11_1453202008841_2794_01_000025] is running beyond physical memory limits. Current usage: 8.0 GB of 8 GB physical memory used; 9.1 GB of 16.8 GB virtual memory used. Killing container.

Dump of the process-tree for container_e11_1453202008841_2794_01_000025 :

          |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

          |- 14331 14329 14331 14331 (bash) 0 0 108646400 308 /bin/bash -c /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m  -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.out 2> /data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.err --streamingMode batch

          |- 14348 14331 14331 14331 (java) 565583 11395 9636184064 2108473 /usr/java/default/bin/java -Xms5376m -Xmx5376m -XX:MaxDirectMemorySize=5376m -Dlog.file=/data/3/hadoop/yarn/log/application_1453202008841_2794/container_e11_1453202008841_2794_01_000025/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode batch



Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143



10:58:54,471 INFO  org.apache.flink.yarn.YarnJobManager



De : LINZ, Arnaud
Envoyé : lundi 1 février 2016 09:40
À : user@flink.apache.org
Objet : RE: Left join with unbalanced dataset

Hi,
Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.
I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.
Arnaud


De : ewenstephan@gmail.com<ma...@gmail.com> [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : dimanche 31 janvier 2016 20:57
À : user@flink.apache.org<ma...@flink.apache.org>
Objet : Re: Left join with unbalanced dataset

Hi!

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

Thanks,
Stephan


On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <tr...@apache.org>> wrote:
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <ch...@apache.org>> wrote:
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.



RE: Left join with unbalanced dataset

Posted by "LINZ, Arnaud" <AL...@bouyguestelecom.fr>.
Hi,
Thanks, I can’t believe I missed the outer join operators… Will try them and will keep you informed.
I use the “official” 0.10 release from the maven repo. The off-heap memory I use is the one HDFS I/O uses (codec, DFSOutputstream threads…), but I don’t have many open files at once, and doubling the amount of memory did not solve the problem.
Arnaud


De : ewenstephan@gmail.com [mailto:ewenstephan@gmail.com] De la part de Stephan Ewen
Envoyé : dimanche 31 janvier 2016 20:57
À : user@flink.apache.org
Objet : Re: Left join with unbalanced dataset

Hi!

YARN killing the application seems strange. The memory use that YARN sees should not change even when one node gets a lot or data.

Can you share what version of Flink (plus commit hash) you are using and whether you use off-heap memory or not?

Thanks,
Stephan


On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <tr...@apache.org>> wrote:
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if they all share the same key. Otherwise, they will be evenly spread out across your cluster. However, I would also recommend you to use Flink's leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <ch...@apache.org>> wrote:
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>> wrote:
>
> Hello,
>
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
>
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
>
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
>
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
>
> Best regards,
>
> Arnaud
>
>
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.



Re: Left join with unbalanced dataset

Posted by Stephan Ewen <se...@apache.org>.
Hi!

YARN killing the application seems strange. The memory use that YARN sees
should not change even when one node gets a lot or data.

Can you share what version of Flink (plus commit hash) you are using and
whether you use off-heap memory or not?

Thanks,
Stephan


On Sun, Jan 31, 2016 at 10:47 AM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi Arnaud,
>
> the unmatched elements of A will only end up on the same worker node if
> they all share the same key. Otherwise, they will be evenly spread out
> across your cluster. However, I would also recommend you to use Flink's
> leftOuterJoin.
>
> Cheers,
> Till
>
> On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <ch...@apache.org>
> wrote:
>
>> Hi Arnaud,
>>
>> To join two datasets, the community recommends using join operation
>> rather than cogroup operation. For left join, you can use leftOuterJoin
>> method. Flink’s optimizer decides distributed join execution strategy using
>> some statistics of the datasets such as size of the dataset. Additionally,
>> you can set join hint to help optimizer decide the strategy.
>>
>> In transformations section [1] of Flink documentation, you can find about
>> outer join operation in detail.
>>
>> I hope this helps.
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations
>>
>> Regards,
>> Chiwan Park
>>
>> > On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
>> wrote:
>> >
>> > Hello,
>> >
>> > I have a very big dataset A to left join with a dataset B that is half
>> its size. That is to say, half of A records will be matched with one record
>> of B, and the other half with null values.
>> >
>> > I used a CoGroup for that, but my batch fails because yarn kills the
>> container due to memory problems.
>> >
>> > I guess that’s because one worker will get half of A dataset (the
>> unmatched ones), and that’s too much for a single JVM
>> >
>> > Am I right in my diagnostic ? Is there a better way to left join
>> unbalanced datasets ?
>> >
>> > Best regards,
>> >
>> > Arnaud
>> >
>> >
>> >
>> > L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>> >
>> > The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>>
>

Re: Left join with unbalanced dataset

Posted by Till Rohrmann <tr...@apache.org>.
Hi Arnaud,

the unmatched elements of A will only end up on the same worker node if
they all share the same key. Otherwise, they will be evenly spread out
across your cluster. However, I would also recommend you to use Flink's
leftOuterJoin.

Cheers,
Till

On Sun, Jan 31, 2016 at 5:27 AM, Chiwan Park <ch...@apache.org> wrote:

> Hi Arnaud,
>
> To join two datasets, the community recommends using join operation rather
> than cogroup operation. For left join, you can use leftOuterJoin method.
> Flink’s optimizer decides distributed join execution strategy using some
> statistics of the datasets such as size of the dataset. Additionally, you
> can set join hint to help optimizer decide the strategy.
>
> In transformations section [1] of Flink documentation, you can find about
> outer join operation in detail.
>
> I hope this helps.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations
>
> Regards,
> Chiwan Park
>
> > On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr>
> wrote:
> >
> > Hello,
> >
> > I have a very big dataset A to left join with a dataset B that is half
> its size. That is to say, half of A records will be matched with one record
> of B, and the other half with null values.
> >
> > I used a CoGroup for that, but my batch fails because yarn kills the
> container due to memory problems.
> >
> > I guess that’s because one worker will get half of A dataset (the
> unmatched ones), and that’s too much for a single JVM
> >
> > Am I right in my diagnostic ? Is there a better way to left join
> unbalanced datasets ?
> >
> > Best regards,
> >
> > Arnaud
> >
> >
> >
> > L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
> vous n'êtes pas destinataire de ce message, merci de le détruire et
> d'avertir l'expéditeur.
> >
> > The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.
>
>

Re: Left join with unbalanced dataset

Posted by Chiwan Park <ch...@apache.org>.
Hi Arnaud,

To join two datasets, the community recommends using join operation rather than cogroup operation. For left join, you can use leftOuterJoin method. Flink’s optimizer decides distributed join execution strategy using some statistics of the datasets such as size of the dataset. Additionally, you can set join hint to help optimizer decide the strategy.

In transformations section [1] of Flink documentation, you can find about outer join operation in detail.

I hope this helps.

[1]: https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/programming_guide.html#transformations

Regards,
Chiwan Park

> On Jan 30, 2016, at 6:43 PM, LINZ, Arnaud <AL...@bouyguestelecom.fr> wrote:
> 
> Hello,
> 
> I have a very big dataset A to left join with a dataset B that is half its size. That is to say, half of A records will be matched with one record of B, and the other half with null values.
> 
> I used a CoGroup for that, but my batch fails because yarn kills the container due to memory problems.
> 
> I guess that’s because one worker will get half of A dataset (the unmatched ones), and that’s too much for a single JVM
> 
> Am I right in my diagnostic ? Is there a better way to left join unbalanced datasets ?
> 
> Best regards,
> 
> Arnaud
> 
> 
> 
> L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
> 
> The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.