You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Schneider, Thilo" <T....@Fraport.de> on 2020/10/30 06:53:18 UTC

Insufficient number of network buffers for simple last_value aggregate

Dear list,

when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows:

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(environment_settings=env_settings)


t_env.execute_sql("""
CREATE TABLE key_change_test (
  id INT,  val1 STRING,  val2 STRING,  t AS proctime()
) WITH (
  'connector' = 'kafka',
  'format' = 'csv',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = 'localhost:9192',
  'properties.group.id' = 'foo'
)
""")

tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id")

t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')")
tt.execute_insert("debug")


Looking at the logs I get the following error message:
[…]
2020-10-30 07:45:46,474 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, key_change_test]], fields=[id, val1, val2]) (21/88) (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.
java.io.IOException: Insufficient number of network buffers: required 89, but only 67 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869) [flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635) [flink-dist_2.11-1.11.2.jar:1.11.2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) [flink-dist_2.11-1.11.2.jar:1.11.2]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]
[…]

What is happening there? For me it seems that flink is requesting an awful lot of resources for a simple query (the kafka topic has only one partition and is used for manual injection only, so no big traffic there).
Can you help me with any way around that problem?

Thanks in advance
Thilo

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang

Re: Insufficient number of network buffers for simple last_value aggregate

Posted by Arvid Heise <ar...@ververica.com>.
Hi Thilo,

the number of required network buffers depends on your data exchanges and
parallelism.
For each shuffling data exchange (what you need for group by), you ideally
have #slots-per-TM^2 * #TMs * 4 buffers.

So I'm assuming you have 11 machines and 8 slots per machine. Then for best
performance, you should give the network stack
8 * 8 * 11 * 4 = 2816 buffers.

It may work with less buffers, but depending on the final topology you may
also have more than one shuffling data exchange.

So what can you do:
1) Downscale your job. If your data flow is low, maybe you don't need such
a high parallelism.
2) More smaller nodes. If you are running in the cloud, you may use more
TMs with a lower number of slots. That decreases the number of network
connections per TM significantly.
3) Increase memory size (see Xintong's answer). For larger setups, it's
usually enough to just increase taskmanager.memory.network.fraction to 0.2
or 0.3. Be aware that you may need to decrease other memory fractions
accordingly.
4) Decrease buffer size. Smaller buffers = more buffers. Set
taskmanager.memory.segment-size to a lower value than the default 32kb.



On Fri, Oct 30, 2020 at 8:08 AM Xintong Song <to...@gmail.com> wrote:

> Hi Schneider,
>
> The error message suggests that your task managers are not configured with
> enough network memory. You would need to increase the network memory
> configuration. See this doc [1] for more details.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_trouble.html#ioexception-insufficient-number-of-network-buffers
>
> On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <T....@fraport.de>
> wrote:
>
>> Dear list,
>>
>>
>>
>> when trying to compute a simple last_value aggregate, flink fails with an
>> IOException. The query is defined as follows:
>>
>>
>>
>> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>> env_settings =
>> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
>> t_env = StreamTableEnvironment.create(environment_settings=env_settings)
>>
>>
>>
>> t_env.execute_sql("""
>> CREATE TABLE key_change_test (
>>   id INT,  val1 STRING,  val2 STRING,  t AS proctime()
>> ) WITH (
>>   'connector' = 'kafka',
>>   'format' = 'csv',
>>   'topic' = 'flink_test',
>>   'properties.bootstrap.servers' = 'localhost:9192',
>>   'properties.group.id' = 'foo'
>> )
>> """)
>>
>> tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id")
>>
>> t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')")
>> tt.execute_insert("debug")
>>
>>
>>
>>
>>
>> Looking at the logs I get the following error message:
>>
>> […]
>>
>> 2020-10-30 07:45:46,474 WARN
>> org.apache.flink.runtime.taskmanager.Task                    [] - Source:
>> TableSourceScan(table=[[default_catalog, default_database,
>> key_change_test]], fields=[id, val1, val2]) (21/88)
>> (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.
>>
>> java.io.IOException: Insufficient number of network buffers: required 89,
>> but only 67 available. The total number of network buffers is currently set
>> to 2048 of 32768 bytes each. You can increase this number by setting the
>> configuration keys 'taskmanager.memory.network.fraction',
>> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
>>
>>         at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at
>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at
>> org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>
>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]
>>
>> […]
>>
>>
>>
>> What is happening there? For me it seems that flink is requesting an
>> awful lot of resources for a simple query (the kafka topic has only one
>> partition and is used for manual injection only, so no big traffic there).
>>
>> Can you help me with any way around that problem?
>>
>>
>>
>> Thanks in advance
>>
>> Thilo
>>
>> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
>> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
>> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
>> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
>> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
>> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Insufficient number of network buffers for simple last_value aggregate

Posted by Xintong Song <to...@gmail.com>.
Hi Schneider,

The error message suggests that your task managers are not configured with
enough network memory. You would need to increase the network memory
configuration. See this doc [1] for more details.

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_trouble.html#ioexception-insufficient-number-of-network-buffers

On Fri, Oct 30, 2020 at 2:53 PM Schneider, Thilo <T....@fraport.de>
wrote:

> Dear list,
>
>
>
> when trying to compute a simple last_value aggregate, flink fails with an
> IOException. The query is defined as follows:
>
>
>
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> env_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> t_env = StreamTableEnvironment.create(environment_settings=env_settings)
>
>
>
> t_env.execute_sql("""
> CREATE TABLE key_change_test (
>   id INT,  val1 STRING,  val2 STRING,  t AS proctime()
> ) WITH (
>   'connector' = 'kafka',
>   'format' = 'csv',
>   'topic' = 'flink_test',
>   'properties.bootstrap.servers' = 'localhost:9192',
>   'properties.group.id' = 'foo'
> )
> """)
>
> tt = t_env.sql_query("SELECT id, LAST_VALUE(val1) AS val1, LAST_VALUE(val2) AS val2 FROM key_change_test GROUP BY id")
>
> t_env.execute_sql("CREATE TABLE debug (id INT, val1 VARCHAR, val2 VARCHAR) WITH ('connector' = 'print')")
> tt.execute_insert("debug")
>
>
>
>
>
> Looking at the logs I get the following error message:
>
> […]
>
> 2020-10-30 07:45:46,474 WARN
> org.apache.flink.runtime.taskmanager.Task                    [] - Source:
> TableSourceScan(table=[[default_catalog, default_database,
> key_change_test]], fields=[id, val1, val2]) (21/88)
> (02f23a929919c200dbd54b7dcef635e2) switched from DEPLOYING to FAILED.
>
> java.io.IOException: Insufficient number of network buffers: required 89,
> but only 67 available. The total number of network buffers is currently set
> to 2048 of 32768 bytes each. You can increase this number by setting the
> configuration keys 'taskmanager.memory.network.fraction',
> 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
>
>         at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:357)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:332)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:224)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:146)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
>
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141]
>
> […]
>
>
>
> What is happening there? For me it seems that flink is requesting an awful
> lot of resources for a simple query (the kafka topic has only one partition
> and is used for manual injection only, so no big traffic there).
>
> Can you help me with any way around that problem?
>
>
>
> Thanks in advance
>
> Thilo
>
> Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main,
> Sitz der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main:
> HRB 7042, Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender
> des Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen;
> Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller,
> Dr. Pierre Dominique Prümm, Dr. Matthias Zieschang
>