You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sourigna Phetsarath <gn...@teamaol.com> on 2016/03/03 22:27:25 UTC

Setting taskmanager.network.numberOfBuffers and getting errors...

All:

I'm running a Flink 0.10.2 App by submitting to YARN as an application.
I'm using an AWS EMR cluster of 1 Master and 10 d2.8xlarge.  When I submit
the job using:


bin/flink run \
    -m yarn-cluster \
    -yjm 20480 \
    -yn 10 \
    -ytm 80960 \
    -ys 36 \
    -yD taskmanager.network.numberOfBuffers=*51840* \
...

I'm seeing this error:

Caused by: java.io.IOException: Insufficient number of network buffers:
required *360*, but only *315* available. The total number of network
buffers is currently set to *51840*. You can increase this number by
setting the configuration key '*taskmanager.network.numberOfBuffers*'.

at
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)

at
org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)

at java.lang.Thread.run(Thread.java:745)

The error message does not seem to be conveying the correct information.

Can someone explain to me, what are reasonable numbers to use for
*taskmanager.network.numberOfBuffers* and *t*
*askmanager.network.bufferSizeInBytes*

I've read this:
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
and this:
http://stackoverflow.com/questions/33589710/flink-cluster-params-how-to-set

But I am still unclear of the calculus is it supposed to be?

 #cores ^ 2 * #machines * 4

So, in my case  36 ^ 2 * 10 * 4 = 51840

Thanks in advance for you help that you can provide.

-- 


*Gna Phetsarath*System Architect // AOL Platforms // Data Services //
Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 aim: sphetsarath20 t: @sourigna

* <http://www.aolplatforms.com>*

Re: Setting taskmanager.network.numberOfBuffers and getting errors...

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Sourigna,

you are using the formula correctly: #cores should to be translated into
slots per taskmanager (TM), and #machines into number of TMs. So 36 ^ 2 *
10 * 4 = 51840 appears to be right.
The constant 4 refers to the total number of concurrently active full
network shuffles (partitioning or broadcasting). If your job is more
complex, e.g., it has several inputs which are joined, reduced, etc, the
constant needs to be adapted accordingly.

The high number of network buffers is due to Flink's pipelined data
exchange. Producing sender tasks send records to receiving tasks while the
data produced. Pipelining can significantly improve the performance of
jobs, but for high parallelism it requires quite a bit of memory. Hadoop
and Spark use a different technique to ship data. They collect data on the
sender and ship it in batches to the receiver. This technique is less
memory intensive but has a higher latency. Flink does also support batched
data exchange. If you do not want to allocate so much memory for pipelined
shuffles, you can activate batched data exchanges by calling:

ExecutionEnvironment env = ...
env.getConfig().setExecutionMode(ExecutionMode.BATCH);

Best,
Fabian

2016-03-03 22:27 GMT+01:00 Sourigna Phetsarath <gn...@teamaol.com>:

> All:
>
> I'm running a Flink 0.10.2 App by submitting to YARN as an application.
> I'm using an AWS EMR cluster of 1 Master and 10 d2.8xlarge.  When I submit
> the job using:
>
>
> bin/flink run \
>     -m yarn-cluster \
>     -yjm 20480 \
>     -yn 10 \
>     -ytm 80960 \
>     -ys 36 \
>     -yD taskmanager.network.numberOfBuffers=*51840* \
> ...
>
> I'm seeing this error:
>
> Caused by: java.io.IOException: Insufficient number of network buffers:
> required *360*, but only *315* available. The total number of network
> buffers is currently set to *51840*. You can increase this number by
> setting the configuration key '*taskmanager.network.numberOfBuffers*'.
>
> at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>
> at
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:325)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:488)
>
> at java.lang.Thread.run(Thread.java:745)
>
> The error message does not seem to be conveying the correct information.
>
> Can someone explain to me, what are reasonable numbers to use for
> *taskmanager.network.numberOfBuffers* and *t*
> *askmanager.network.bufferSizeInBytes*
>
> I've read this:
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
> and this:
> http://stackoverflow.com/questions/33589710/flink-cluster-params-how-to-set
>
> But I am still unclear of the calculus is it supposed to be?
>
>  #cores ^ 2 * #machines * 4
>
> So, in my case  36 ^ 2 * 10 * 4 = 51840
>
> Thanks in advance for you help that you can provide.
>
> --
>
>
> *Gna Phetsarath*System Architect // AOL Platforms // Data Services //
> Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
> * <http://www.aolplatforms.com>*
>