You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Xintong Song <to...@gmail.com> on 2020/06/01 03:08:42 UTC

Re: Flink Dashboard UI Tasks hard limit

Hi Vijay,

The error message suggests that another task manager (10.127.106.54) is not
responding. This could happen when the remote task manager has failed or
under severe GC pressure. You would need to find the log of the remote task
manager to understand what is happening.

Thank you~

Xintong Song



On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Hi All,
> The Job takes forever to startup and is now failing all the time to
> startup.
> Physical Memory:62.1 GB
> JVM Heap Size:15.0 GB
> Flink Managed Memory:10.5 GB
> Attached a TM screenshot.
>
> Tried increasing the following:
>
> taskmanager.numberOfTaskSlots: 10
> parallelism.default: 1
> rest.server.max-content-length: 314572800
> taskmanager.network.memory.fraction: 0.45
> taskmanager.network.memory.max: 24gb
> taskmanager.network.memory.min: 500mb
> akka.ask.timeout: 240s
> cluster.evenly-spread-out-slots: true
> taskmanager.network.netty.client.connectTimeoutSec: 240
> taskmanager.network.detailed-metrics: true
> taskmanager.network.memory.floating-buffers-per-gate: 16
> akka.tcp.timeout: 30s
>
> There are more than enough slots. Issue seems to be communicating over TCP
> with Remote Task managers ??
>
> Getting this exception on a TaskManager:
>
> 2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task
>                   - Window(TumblingEventTimeWindows(5000),
> EventTimeTrigger, MGroupingWindowAggregate,
> MGroupingAggregateWindowProcessing) (36/440)
> (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
> Connection for partition
> faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not
> reachable.
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
> at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
> at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> --
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
> at
> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
> at
> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connecting the channel failed: Connecting
> to remote task manager + '/10.127.106.54:33564' has failed. This might
> indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
> ... 7 more
> --
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
> ... 7 more
> Caused by:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
> This might indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> ... 1 more
> --
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
> ... 1 more
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection timed out: /10.127.106.54:33564
>
>
> On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Thx, Xintong for the detailed explanation of memory fraction. I increased
>> the mem fraction now.
>>
>> As I increase the defaultParallelism, I keep getting this error:
>>
>> org.apache.flink.runtime.io.network.partition.consumer.
>> PartitionConnectionException: Connection for partition
>> e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
>> reachable.
>>     at org.apache.flink.runtime.io.network.partition.consumer.
>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>     at org.apache.flink.runtime.io.network.partition.consumer.
>> SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>     at org.apache.flink.runtime.io.network.partition.consumer.
>> SingleInputGate.setup(SingleInputGate.java:215)
>>     at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
>> InputGateWithMetrics.java:65)
>>     at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(
>> Task.java:866)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Connecting the channel failed: Connecting
>> to remote task manager + '/10.9.239.218:45544' has failed. This might
>> indicate that the remote task manager has been lost.
>>     at org.apache.flink.runtime.io.network.netty.
>> PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
>> PartitionRequestClientFactory.java:197)
>>     at org.apache.flink.runtime.io.network.netty.
>> PartitionRequestClientFactory$ConnectingChannel.access$000(
>> PartitionRequestClientFactory.java:134)
>>     at org.apache.flink.runtime.io.network.netty.
>> PartitionRequestClientFactory.createPartitionRequestClient(
>> PartitionRequestClientFactory.java:70)
>>     at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
>> .createPartitionRequestClient(NettyConnectionManager.java:68)
>>     at org.apache.flink.runtime.io.network.partition.consumer.
>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>     ... 7 more
>> Caused by: org.apache.flink.runtime.io.network.netty.exception.
>> RemoteTransportException: Connecting to remote task manager + '/
>> 10.9.239.218:45544' has failed. This might indicate that the remote task
>> manager has been lost.
>>     at org.apache.flink.runtime.io.network.netty.
>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>> PartitionRequestClientFactory.java:220)
>>     at org.apache.flink.runtime.io.network.netty.
>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>> PartitionRequestClientFactory.java:134)
>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> DefaultPromise.tryFailure(DefaultPromise.java:121)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>> AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
>> AbstractNioChannel.java:327)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>> .java:343)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .processSelectedKey(NioEventLoop.java:644)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .processSelectedKeysOptimized(NioEventLoop.java:591)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .processSelectedKeys(NioEventLoop.java:508)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>> .run(NioEventLoop.java:470)
>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>     ... 1 more
>> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
>> AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
>> 239.218:45544
>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:
>> 714)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
>> NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>> .java:340)
>>     ... 6 more
>> Caused by: java.net.ConnectException: Connection timed out
>>     ... 10 more
>>
>>
>> On Wed, May 27, 2020 at 7:14 PM Xintong Song <to...@gmail.com>
>> wrote:
>>
>>> Ah, I guess I had misunderstood what your mean.
>>>
>>> Below 18000 tasks, the Flink Job is able to start up.
>>>> Even though I increased the number of slots, it still works when 312
>>>> slots are being used.
>>>>
>>> When you say "it still works", I thought that you increased the
>>> parallelism the job was sill executed as the parallelism was not increased.
>>> From your latest reply, it seems the job's parallelism is indeed
>>> increased, but then it runs into failures.
>>>
>>> The reason you run into the "Insufficient number of network buffers"
>>> exception, is that with more tasks in your job, more inter-task data
>>> transmission channels, thus memory for network buffers, are needed.
>>>
>>> To increase the network memory size, the following configuration
>>> options, as you already found, are related.
>>>
>>>    - taskmanager.network.memory.fraction
>>>    - taskmanager.network.memory.max
>>>    - taskmanager.network.memory.min
>>>
>>> Please be aware that `taskmanager.memory.task.off-heap.size` is not
>>> related to network memory, and is only available in Flink 1.10 and above
>>> while you're using 1.9.1 as suggested by the screenshots.
>>>
>>> The network memory size is calculated as `min(max(some_total_value *
>>> network_fraction, network_min), network_max)`. According to the error
>>> message, your current network memory size is `85922 buffers *
>>> 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means
>>> increasing the "max" does not help in your case. It is the "fraction" that
>>> you need to increase.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bv...@gmail.com>
>>> wrote:
>>>
>>>> Hi Xintong,
>>>> Looks like the issue is not fully resolved :( Attaching 2 screenshots
>>>> of the memory consumption of 1 of the TaskManagers.
>>>>
>>>> To increase the used up Direct memory off heap,Do I change this:
>>>>  taskmanager.memory.task.off-heap.size: 5gb
>>>>
>>>> I had increased the taskmanager.network.memory.max: 24gb
>>>> which seems excessive.
>>>>
>>>> 1 of the errors I saw in the Flink logs:
>>>>
>>>> java.io.IOException: Insufficient number of network buffers: required
>>>> 1, but only 0 available. The total number of network buffers is currently
>>>> set to 85922 of 32768 bytes each. You can increase this number by setting
>>>> the configuration keys 'taskmanager.network.memory.fraction',
>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>>> at
>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>>>> at
>>>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>>>
>>>> TIA,
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <bv...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks so much, Xintong for guiding me through this. I looked at the
>>>>> Flink logs to see the errors.
>>>>> I had to change taskmanager.network.memory.max: 4gb
>>>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>>>
>>>>> taskmanager.network.memory.fraction: 0.15
>>>>> taskmanager.network.memory.max: 4gb
>>>>> taskmanager.network.memory.min: 500mb
>>>>> akka.ask.timeout: 240s
>>>>>
>>>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <to...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Could you also explain how do you set the parallelism when getting
>>>>>> this execution plan?
>>>>>> I'm asking because this json file itself only shows the resulted
>>>>>> execution plan. It is not clear to me what is not working as expected in
>>>>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>>>>> execution plan only shows 5.
>>>>>>
>>>>>> Thank you~
>>>>>>
>>>>>> Xintong Song
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <
>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Xintong,
>>>>>>> Thanks for the excellent clarification for tasks.
>>>>>>>
>>>>>>> I attached a sample screenshot above and din't reflect the slots
>>>>>>> used and the tasks limit I was running into in that pic.
>>>>>>>
>>>>>>> I am attaching my Execution plan here. Please let me know how I can
>>>>>>> increase the nmber of tasks aka parallelism. As  increase the parallelism,
>>>>>>> i run into this bottleneck with the tasks.
>>>>>>>
>>>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start to
>>>>>>> see this.
>>>>>>> TIA,
>>>>>>>
>>>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Increasing network memory buffers (fraction, min, max) seems to
>>>>>>>>> increase tasks slightly.
>>>>>>>>
>>>>>>>> That's wired. I don't think the number of network memory buffers
>>>>>>>> have anything to do with the task amount.
>>>>>>>>
>>>>>>>> Let me try to clarify a few things.
>>>>>>>>
>>>>>>>> Please be aware that, how many tasks a Flink job has, and how many
>>>>>>>> slots a Flink cluster has, are two different things.
>>>>>>>> - The number of tasks are decided by your job's parallelism and
>>>>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>>>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
>>>>>>>> tasks.
>>>>>>>> - The number of slots are decided by number of TMs and slots-per-TM.
>>>>>>>> - For streaming jobs, you have to make sure the number of slots is
>>>>>>>> enough for executing all your tasks. The number of slots needed for
>>>>>>>> executing your job is by default the max parallelism of your job graph
>>>>>>>> vertices. Take the above example, you would need 4 slots, because it's the
>>>>>>>> max among all the vertices' parallelisms (2, 3, 4).
>>>>>>>>
>>>>>>>> In your case, the screenshot shows that you job has 9621 tasks in
>>>>>>>> total (not around 18000, the dark box shows total tasks while the green box
>>>>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting that
>>>>>>>> the max parallelism of your job graph vertices is 600.
>>>>>>>>
>>>>>>>> If you want to increase the number of tasks, you should increase
>>>>>>>> your job parallelism. There are several ways to do that.
>>>>>>>>
>>>>>>>>    - In your job codes (assuming you are using DataStream API)
>>>>>>>>       - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>>>>>>       parallelism for all operators.
>>>>>>>>       - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>>>>>>       parallelism for a specific operator. (Only supported for subclasses of
>>>>>>>>       `SingleOutputStreamOperator`.)
>>>>>>>>    - When submitting your job, use `-p <parallelism>` as an
>>>>>>>>    argument for the `flink run` command, to set parallelism for all operators.
>>>>>>>>    - Set `parallelism.default` in your `flink-conf.yaml`, to set a
>>>>>>>>    default parallelism for your jobs. This will be used for jobs that have not
>>>>>>>>    set parallelism with neither of the above methods.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <
>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Xintong,
>>>>>>>>> Thx for your reply.  Increasing network memory buffers (fraction,
>>>>>>>>> min, max) seems to increase tasks slightly.
>>>>>>>>>
>>>>>>>>> Streaming job
>>>>>>>>> Standalone
>>>>>>>>>
>>>>>>>>> Vijay
>>>>>>>>>
>>>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <
>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Vijay,
>>>>>>>>>>
>>>>>>>>>> I don't think your problem is related to number of opening files.
>>>>>>>>>> The parallelism of your job is decided before actually tries to open the
>>>>>>>>>> files. And if the OS limit for opening files is reached, you should see a
>>>>>>>>>> job execution failure, instead of a success execution with a lower
>>>>>>>>>> parallelism.
>>>>>>>>>>
>>>>>>>>>> Could you share some more information about your use case?
>>>>>>>>>>
>>>>>>>>>>    - What kind of job are your executing? Is it a streaming or
>>>>>>>>>>    batch processing job?
>>>>>>>>>>    - Which Flink deployment do you use? Standalone? Yarn?
>>>>>>>>>>    - It would be helpful if you can share the Flink logs.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thank you~
>>>>>>>>>>
>>>>>>>>>> Xintong Song
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <
>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> I have increased the number of slots available but the Job is
>>>>>>>>>>> not using all the slots but runs into this approximate 18000 Tasks limit.
>>>>>>>>>>> Looking into the source code, it seems to be opening file -
>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>>>>>>>>>> So, do I have to tune the ulimit or something similar at the
>>>>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I am confused
>>>>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is across many
>>>>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks equate to
>>>>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge
>>>>>>>>>>> which has 16 vCPUs.
>>>>>>>>>>>
>>>>>>>>>>> TIA.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <
>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit for
>>>>>>>>>>>> Tasks column around 18000 on a Ubuntu Linux box.
>>>>>>>>>>>> I kept increasing the number of slots per task manager to 15
>>>>>>>>>>>> and number of slots increased to 705 but the slots to tasks
>>>>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is
>>>>>>>>>>>> able to start up.
>>>>>>>>>>>> Even though I increased the number of slots, it still works
>>>>>>>>>>>> when 312 slots are being used.
>>>>>>>>>>>>
>>>>>>>>>>>> taskmanager.numberOfTaskSlots: 15
>>>>>>>>>>>>
>>>>>>>>>>>> What knob can I tune to increase the number of Tasks ?
>>>>>>>>>>>>
>>>>>>>>>>>> Pls find attached the Flink Dashboard UI.
>>>>>>>>>>>>
>>>>>>>>>>>> TIA,
>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Flink Dashboard UI Tasks hard limit

Posted by Xintong Song <to...@gmail.com>.
Hi Vijay,

From the information you provided (the configurations, error message &
screenshot), I'm not able to find out what is the problem and how to
resolve it.

The error message comes from a healthy task manager, who discovered that
another task manager is not responding. We would need to look into the *log
of the task manager that is not responding* to understand what's wrong with
it.

Thank you~

Xintong Song



On Fri, Jun 5, 2020 at 6:06 AM Vijay Balakrishnan <bv...@gmail.com>
wrote:

> Thx a ton, Xintong.
> I am using this configuration now:
>  taskmanager.numberOfTaskSlots: 14
>     rest.server.max-content-length: 314572800
>     taskmanager.network.memory.fraction: 0.45
>     taskmanager.network.memory.max: 24gb
>     taskmanager.network.memory.min: 500mb
>     akka.ask.timeout: 240s
>     cluster.evenly-spread-out-slots: true
>     akka.tcp.timeout: 240s
> taskmanager.network.request-backoff.initial: 5000
> taskmanager.network.request-backoff.max: 30000
> web.timeout:1000000
>
> I still get an error on startup with loading the Flink jar. It resolves
> itself after failing on the 1st few tries. This is
> where taskmanager.network.request-backoff.initial: 5000 helped a little
> bit. Would like to get this Job starting successfully on the 1st try
> itself.Also attaching screenshot of error on Job failure.
> Exception:
> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
> Connection for partition
> ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not
> reachable.
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
> ...
> Caused by: java.io.IOException: Connecting the channel failed: Connecting
> to remote task manager + '/10.128.49.96:43060' has failed. This might
> indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
> ... 7 more
> Caused by:
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connecting to remote task manager + '/10.128.49.96:43060' has failed.
> This might indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection timed out: /10.128.49.96:43060
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
> ... 6 more
> Caused by: java.net.ConnectException: Connection timed out
> ... 10 more
>
> TIA,
>
>
>
> On Sun, May 31, 2020 at 8:08 PM Xintong Song <to...@gmail.com>
> wrote:
>
>> Hi Vijay,
>>
>> The error message suggests that another task manager (10.127.106.54) is
>> not responding. This could happen when the remote task manager has failed
>> or under severe GC pressure. You would need to find the log of the remote
>> task manager to understand what is happening.
>>
>> Thank you~
>>
>> Xintong Song
>>
>>
>>
>> On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>> The Job takes forever to startup and is now failing all the time to
>>> startup.
>>> Physical Memory:62.1 GB
>>> JVM Heap Size:15.0 GB
>>> Flink Managed Memory:10.5 GB
>>> Attached a TM screenshot.
>>>
>>> Tried increasing the following:
>>>
>>> taskmanager.numberOfTaskSlots: 10
>>> parallelism.default: 1
>>> rest.server.max-content-length: 314572800
>>> taskmanager.network.memory.fraction: 0.45
>>> taskmanager.network.memory.max: 24gb
>>> taskmanager.network.memory.min: 500mb
>>> akka.ask.timeout: 240s
>>> cluster.evenly-spread-out-slots: true
>>> taskmanager.network.netty.client.connectTimeoutSec: 240
>>> taskmanager.network.detailed-metrics: true
>>> taskmanager.network.memory.floating-buffers-per-gate: 16
>>> akka.tcp.timeout: 30s
>>>
>>> There are more than enough slots. Issue seems to be communicating over
>>> TCP with Remote Task managers ??
>>>
>>> Getting this exception on a TaskManager:
>>>
>>> 2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task
>>>                     - Window(TumblingEventTimeWindows(5000),
>>> EventTimeTrigger, MGroupingWindowAggregate,
>>> MGroupingAggregateWindowProcessing) (36/440)
>>> (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
>>> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
>>> Connection for partition
>>> faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not
>>> reachable.
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>>> at
>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> --
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>>> at
>>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: Connecting the channel failed:
>>> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
>>> This might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>> ... 7 more
>>> --
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>>> at
>>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>>> at
>>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>> ... 7 more
>>> Caused by:
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
>>> This might indicate that the remote task manager has been lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>> ... 1 more
>>> --
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>>> at
>>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>> ... 1 more
>>> Caused by:
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>> Connection timed out: /10.127.106.54:33564
>>>
>>>
>>> On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <bv...@gmail.com>
>>> wrote:
>>>
>>>> Thx, Xintong for the detailed explanation of memory fraction. I
>>>> increased the mem fraction now.
>>>>
>>>> As I increase the defaultParallelism, I keep getting this error:
>>>>
>>>> org.apache.flink.runtime.io.network.partition.consumer.
>>>> PartitionConnectionException: Connection for partition
>>>> e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
>>>> reachable.
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> SingleInputGate.setup(SingleInputGate.java:215)
>>>>     at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
>>>> InputGateWithMetrics.java:65)
>>>>     at org.apache.flink.runtime.taskmanager.Task
>>>> .setupPartitionsAndGates(Task.java:866)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.IOException: Connecting the channel failed:
>>>> Connecting to remote task manager + '/10.9.239.218:45544' has failed.
>>>> This might indicate that the remote task manager has been lost.
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
>>>> PartitionRequestClientFactory.java:197)
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.access$000(
>>>> PartitionRequestClientFactory.java:134)
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory.createPartitionRequestClient(
>>>> PartitionRequestClientFactory.java:70)
>>>>     at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
>>>> .createPartitionRequestClient(NettyConnectionManager.java:68)
>>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>>>     ... 7 more
>>>> Caused by: org.apache.flink.runtime.io.network.netty.exception.
>>>> RemoteTransportException: Connecting to remote task manager + '/
>>>> 10.9.239.218:45544' has failed. This might indicate that the remote
>>>> task manager has been lost.
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>>> PartitionRequestClientFactory.java:220)
>>>>     at org.apache.flink.runtime.io.network.netty.
>>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>>> PartitionRequestClientFactory.java:134)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> DefaultPromise.tryFailure(DefaultPromise.java:121)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>>> AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
>>>> AbstractNioChannel.java:327)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>>> .java:343)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .processSelectedKey(NioEventLoop.java:644)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .processSelectedKeysOptimized(NioEventLoop.java:591)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .processSelectedKeys(NioEventLoop.java:508)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>>> .run(NioEventLoop.java:470)
>>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>>     ... 1 more
>>>> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
>>>> AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
>>>> 239.218:45544
>>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>>> .java:714)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
>>>> NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>>> .java:340)
>>>>     ... 6 more
>>>> Caused by: java.net.ConnectException: Connection timed out
>>>>     ... 10 more
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 7:14 PM Xintong Song <to...@gmail.com>
>>>> wrote:
>>>>
>>>>> Ah, I guess I had misunderstood what your mean.
>>>>>
>>>>> Below 18000 tasks, the Flink Job is able to start up.
>>>>>> Even though I increased the number of slots, it still works when 312
>>>>>> slots are being used.
>>>>>>
>>>>> When you say "it still works", I thought that you increased the
>>>>> parallelism the job was sill executed as the parallelism was not increased.
>>>>> From your latest reply, it seems the job's parallelism is indeed
>>>>> increased, but then it runs into failures.
>>>>>
>>>>> The reason you run into the "Insufficient number of network buffers"
>>>>> exception, is that with more tasks in your job, more inter-task data
>>>>> transmission channels, thus memory for network buffers, are needed.
>>>>>
>>>>> To increase the network memory size, the following configuration
>>>>> options, as you already found, are related.
>>>>>
>>>>>    - taskmanager.network.memory.fraction
>>>>>    - taskmanager.network.memory.max
>>>>>    - taskmanager.network.memory.min
>>>>>
>>>>> Please be aware that `taskmanager.memory.task.off-heap.size` is not
>>>>> related to network memory, and is only available in Flink 1.10 and above
>>>>> while you're using 1.9.1 as suggested by the screenshots.
>>>>>
>>>>> The network memory size is calculated as `min(max(some_total_value *
>>>>> network_fraction, network_min), network_max)`. According to the error
>>>>> message, your current network memory size is `85922 buffers *
>>>>> 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means
>>>>> increasing the "max" does not help in your case. It is the "fraction" that
>>>>> you need to increase.
>>>>>
>>>>> Thank you~
>>>>>
>>>>> Xintong Song
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bv...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Xintong,
>>>>>> Looks like the issue is not fully resolved :( Attaching 2 screenshots
>>>>>> of the memory consumption of 1 of the TaskManagers.
>>>>>>
>>>>>> To increase the used up Direct memory off heap,Do I change this:
>>>>>>  taskmanager.memory.task.off-heap.size: 5gb
>>>>>>
>>>>>> I had increased the taskmanager.network.memory.max: 24gb
>>>>>> which seems excessive.
>>>>>>
>>>>>> 1 of the errors I saw in the Flink logs:
>>>>>>
>>>>>> java.io.IOException: Insufficient number of network buffers: required
>>>>>> 1, but only 0 available. The total number of network buffers is currently
>>>>>> set to 85922 of 32768 bytes each. You can increase this number by setting
>>>>>> the configuration keys 'taskmanager.network.memory.fraction',
>>>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>>>>>
>>>>>> TIA,
>>>>>>
>>>>>>
>>>>>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <
>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks so much, Xintong for guiding me through this. I looked at the
>>>>>>> Flink logs to see the errors.
>>>>>>> I had to change taskmanager.network.memory.max: 4gb
>>>>>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>>>>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>>>>>
>>>>>>> taskmanager.network.memory.fraction: 0.15
>>>>>>> taskmanager.network.memory.max: 4gb
>>>>>>> taskmanager.network.memory.min: 500mb
>>>>>>> akka.ask.timeout: 240s
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <to...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Could you also explain how do you set the parallelism when getting
>>>>>>>> this execution plan?
>>>>>>>> I'm asking because this json file itself only shows the resulted
>>>>>>>> execution plan. It is not clear to me what is not working as expected in
>>>>>>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>>>>>>> execution plan only shows 5.
>>>>>>>>
>>>>>>>> Thank you~
>>>>>>>>
>>>>>>>> Xintong Song
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <
>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Xintong,
>>>>>>>>> Thanks for the excellent clarification for tasks.
>>>>>>>>>
>>>>>>>>> I attached a sample screenshot above and din't reflect the slots
>>>>>>>>> used and the tasks limit I was running into in that pic.
>>>>>>>>>
>>>>>>>>> I am attaching my Execution plan here. Please let me know how I
>>>>>>>>> can increase the nmber of tasks aka parallelism. As  increase the
>>>>>>>>> parallelism, i run into this bottleneck with the tasks.
>>>>>>>>>
>>>>>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start
>>>>>>>>> to see this.
>>>>>>>>> TIA,
>>>>>>>>>
>>>>>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <
>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Increasing network memory buffers (fraction, min, max) seems to
>>>>>>>>>>> increase tasks slightly.
>>>>>>>>>>
>>>>>>>>>> That's wired. I don't think the number of network memory buffers
>>>>>>>>>> have anything to do with the task amount.
>>>>>>>>>>
>>>>>>>>>> Let me try to clarify a few things.
>>>>>>>>>>
>>>>>>>>>> Please be aware that, how many tasks a Flink job has, and how
>>>>>>>>>> many slots a Flink cluster has, are two different things.
>>>>>>>>>> - The number of tasks are decided by your job's parallelism and
>>>>>>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>>>>>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
>>>>>>>>>> tasks.
>>>>>>>>>> - The number of slots are decided by number of TMs and
>>>>>>>>>> slots-per-TM.
>>>>>>>>>> - For streaming jobs, you have to make sure the number of slots
>>>>>>>>>> is enough for executing all your tasks. The number of slots needed for
>>>>>>>>>> executing your job is by default the max parallelism of your job graph
>>>>>>>>>> vertices. Take the above example, you would need 4 slots, because it's the
>>>>>>>>>> max among all the vertices' parallelisms (2, 3, 4).
>>>>>>>>>>
>>>>>>>>>> In your case, the screenshot shows that you job has 9621 tasks in
>>>>>>>>>> total (not around 18000, the dark box shows total tasks while the green box
>>>>>>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting that
>>>>>>>>>> the max parallelism of your job graph vertices is 600.
>>>>>>>>>>
>>>>>>>>>> If you want to increase the number of tasks, you should increase
>>>>>>>>>> your job parallelism. There are several ways to do that.
>>>>>>>>>>
>>>>>>>>>>    - In your job codes (assuming you are using DataStream API)
>>>>>>>>>>       - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>>>>>>>>       parallelism for all operators.
>>>>>>>>>>       - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>>>>>>>>       parallelism for a specific operator. (Only supported for subclasses of
>>>>>>>>>>       `SingleOutputStreamOperator`.)
>>>>>>>>>>    - When submitting your job, use `-p <parallelism>` as an
>>>>>>>>>>    argument for the `flink run` command, to set parallelism for all operators.
>>>>>>>>>>    - Set `parallelism.default` in your `flink-conf.yaml`, to set
>>>>>>>>>>    a default parallelism for your jobs. This will be used for jobs that have
>>>>>>>>>>    not set parallelism with neither of the above methods.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thank you~
>>>>>>>>>>
>>>>>>>>>> Xintong Song
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <
>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Xintong,
>>>>>>>>>>> Thx for your reply.  Increasing network memory buffers
>>>>>>>>>>> (fraction, min, max) seems to increase tasks slightly.
>>>>>>>>>>>
>>>>>>>>>>> Streaming job
>>>>>>>>>>> Standalone
>>>>>>>>>>>
>>>>>>>>>>> Vijay
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <
>>>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vijay,
>>>>>>>>>>>>
>>>>>>>>>>>> I don't think your problem is related to number of opening
>>>>>>>>>>>> files. The parallelism of your job is decided before actually tries to open
>>>>>>>>>>>> the files. And if the OS limit for opening files is reached, you should see
>>>>>>>>>>>> a job execution failure, instead of a success execution with a lower
>>>>>>>>>>>> parallelism.
>>>>>>>>>>>>
>>>>>>>>>>>> Could you share some more information about your use case?
>>>>>>>>>>>>
>>>>>>>>>>>>    - What kind of job are your executing? Is it a streaming or
>>>>>>>>>>>>    batch processing job?
>>>>>>>>>>>>    - Which Flink deployment do you use? Standalone? Yarn?
>>>>>>>>>>>>    - It would be helpful if you can share the Flink logs.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you~
>>>>>>>>>>>>
>>>>>>>>>>>> Xintong Song
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <
>>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> I have increased the number of slots available but the Job is
>>>>>>>>>>>>> not using all the slots but runs into this approximate 18000 Tasks limit.
>>>>>>>>>>>>> Looking into the source code, it seems to be opening file -
>>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>>>>>>>>>>>> So, do I have to tune the ulimit or something similar at the
>>>>>>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I am confused
>>>>>>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is across many
>>>>>>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks equate to
>>>>>>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge
>>>>>>>>>>>>> which has 16 vCPUs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> TIA.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <
>>>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit
>>>>>>>>>>>>>> for Tasks column around 18000 on a Ubuntu Linux box.
>>>>>>>>>>>>>> I kept increasing the number of slots per task manager to 15
>>>>>>>>>>>>>> and number of slots increased to 705 but the slots to tasks
>>>>>>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is
>>>>>>>>>>>>>> able to start up.
>>>>>>>>>>>>>> Even though I increased the number of slots, it still works
>>>>>>>>>>>>>> when 312 slots are being used.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> taskmanager.numberOfTaskSlots: 15
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What knob can I tune to increase the number of Tasks ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Pls find attached the Flink Dashboard UI.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> TIA,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Re: Flink Dashboard UI Tasks hard limit

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Thx a ton, Xintong.
I am using this configuration now:
 taskmanager.numberOfTaskSlots: 14
    rest.server.max-content-length: 314572800
    taskmanager.network.memory.fraction: 0.45
    taskmanager.network.memory.max: 24gb
    taskmanager.network.memory.min: 500mb
    akka.ask.timeout: 240s
    cluster.evenly-spread-out-slots: true
    akka.tcp.timeout: 240s
taskmanager.network.request-backoff.initial: 5000
taskmanager.network.request-backoff.max: 30000
web.timeout:1000000

I still get an error on startup with loading the Flink jar. It resolves
itself after failing on the 1st few tries. This is
where taskmanager.network.request-backoff.initial: 5000 helped a little
bit. Would like to get this Job starting successfully on the 1st try
itself.Also attaching screenshot of error on Job failure.
Exception:
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
Connection for partition
ce6b601e14b959de21d8351a6c5cf70c@1f2cd0d827586a4bc7b6f40ad2609db1 not
reachable.
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
...
Caused by: java.io.IOException: Connecting the channel failed: Connecting
to remote task manager + '/10.128.49.96:43060' has failed. This might
indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
... 7 more
Caused by:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connecting to remote task manager + '/10.128.49.96:43060' has failed. This
might indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection timed out: /10.128.49.96:43060
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
... 6 more
Caused by: java.net.ConnectException: Connection timed out
... 10 more

TIA,



On Sun, May 31, 2020 at 8:08 PM Xintong Song <to...@gmail.com> wrote:

> Hi Vijay,
>
> The error message suggests that another task manager (10.127.106.54) is
> not responding. This could happen when the remote task manager has failed
> or under severe GC pressure. You would need to find the log of the remote
> task manager to understand what is happening.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jun 1, 2020 at 4:57 AM Vijay Balakrishnan <bv...@gmail.com>
> wrote:
>
>> Hi All,
>> The Job takes forever to startup and is now failing all the time to
>> startup.
>> Physical Memory:62.1 GB
>> JVM Heap Size:15.0 GB
>> Flink Managed Memory:10.5 GB
>> Attached a TM screenshot.
>>
>> Tried increasing the following:
>>
>> taskmanager.numberOfTaskSlots: 10
>> parallelism.default: 1
>> rest.server.max-content-length: 314572800
>> taskmanager.network.memory.fraction: 0.45
>> taskmanager.network.memory.max: 24gb
>> taskmanager.network.memory.min: 500mb
>> akka.ask.timeout: 240s
>> cluster.evenly-spread-out-slots: true
>> taskmanager.network.netty.client.connectTimeoutSec: 240
>> taskmanager.network.detailed-metrics: true
>> taskmanager.network.memory.floating-buffers-per-gate: 16
>> akka.tcp.timeout: 30s
>>
>> There are more than enough slots. Issue seems to be communicating over
>> TCP with Remote Task managers ??
>>
>> Getting this exception on a TaskManager:
>>
>> 2020-05-31 20:37:31,436 INFO  org.apache.flink.runtime.taskmanager.Task
>>                   - Window(TumblingEventTimeWindows(5000),
>> EventTimeTrigger, MGroupingWindowAggregate,
>> MGroupingAggregateWindowProcessing) (36/440)
>> (921fe6761ce844a6850c5fc67326b221) switched from DEPLOYING to FAILED.
>> org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException:
>> Connection for partition
>> faea47916a206dc8d014694ec72ab577@95f71b39868d4e23a180ce11653dc4ca not
>> reachable.
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>> at
>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>> at
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> --
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:237)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:215)
>> at
>> org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(InputGateWithMetrics.java:65)
>> at
>> org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:866)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Connecting the channel failed: Connecting
>> to remote task manager + '/10.127.106.54:33564' has failed. This might
>> indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>> ... 7 more
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:70)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:68)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>> ... 7 more
>> Caused by:
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connecting to remote task manager + '/10.127.106.54:33564' has failed.
>> This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> ... 1 more
>> --
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:134)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:121)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:327)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:343)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>> ... 1 more
>> Caused by:
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> Connection timed out: /10.127.106.54:33564
>>
>>
>> On Fri, May 29, 2020 at 12:43 PM Vijay Balakrishnan <bv...@gmail.com>
>> wrote:
>>
>>> Thx, Xintong for the detailed explanation of memory fraction. I
>>> increased the mem fraction now.
>>>
>>> As I increase the defaultParallelism, I keep getting this error:
>>>
>>> org.apache.flink.runtime.io.network.partition.consumer.
>>> PartitionConnectionException: Connection for partition
>>> e312b2db4d1d0c65224664f620d06f7d@c3a4fb1e56a0996d9f2ff86dac6e483f not
>>> reachable.
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:168)
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> SingleInputGate.requestPartitions(SingleInputGate.java:237)
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> SingleInputGate.setup(SingleInputGate.java:215)
>>>     at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.setup(
>>> InputGateWithMetrics.java:65)
>>>     at org.apache.flink.runtime.taskmanager.Task
>>> .setupPartitionsAndGates(Task.java:866)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:621)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: Connecting the channel failed:
>>> Connecting to remote task manager + '/10.9.239.218:45544' has failed.
>>> This might indicate that the remote task manager has been lost.
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.waitForChannel(
>>> PartitionRequestClientFactory.java:197)
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.access$000(
>>> PartitionRequestClientFactory.java:134)
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory.createPartitionRequestClient(
>>> PartitionRequestClientFactory.java:70)
>>>     at org.apache.flink.runtime.io.network.netty.NettyConnectionManager
>>> .createPartitionRequestClient(NettyConnectionManager.java:68)
>>>     at org.apache.flink.runtime.io.network.partition.consumer.
>>> RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:165)
>>>     ... 7 more
>>> Caused by: org.apache.flink.runtime.io.network.netty.exception.
>>> RemoteTransportException: Connecting to remote task manager + '/
>>> 10.9.239.218:45544' has failed. This might indicate that the remote
>>> task manager has been lost.
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>> PartitionRequestClientFactory.java:220)
>>>     at org.apache.flink.runtime.io.network.netty.
>>> PartitionRequestClientFactory$ConnectingChannel.operationComplete(
>>> PartitionRequestClientFactory.java:134)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListener0(DefaultPromise.java:511)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListeners0(DefaultPromise.java:504)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.notifyListeners(DefaultPromise.java:424)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> DefaultPromise.tryFailure(DefaultPromise.java:121)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>> AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(
>>> AbstractNioChannel.java:327)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>> .java:343)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .processSelectedKey(NioEventLoop.java:644)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .processSelectedKeysOptimized(NioEventLoop.java:591)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .processSelectedKeys(NioEventLoop.java:508)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop
>>> .run(NioEventLoop.java:470)
>>>     at org.apache.flink.shaded.netty4.io.netty.util.concurrent.
>>> SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
>>>     ... 1 more
>>> Caused by: org.apache.flink.shaded.netty4.io.netty.channel.
>>> AbstractChannel$AnnotatedConnectException: Connection timed out: /10.9.
>>> 239.218:45544
>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>> .java:714)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.
>>> NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
>>>     at org.apache.flink.shaded.netty4.io.netty.channel.nio.
>>> AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel
>>> .java:340)
>>>     ... 6 more
>>> Caused by: java.net.ConnectException: Connection timed out
>>>     ... 10 more
>>>
>>>
>>> On Wed, May 27, 2020 at 7:14 PM Xintong Song <to...@gmail.com>
>>> wrote:
>>>
>>>> Ah, I guess I had misunderstood what your mean.
>>>>
>>>> Below 18000 tasks, the Flink Job is able to start up.
>>>>> Even though I increased the number of slots, it still works when 312
>>>>> slots are being used.
>>>>>
>>>> When you say "it still works", I thought that you increased the
>>>> parallelism the job was sill executed as the parallelism was not increased.
>>>> From your latest reply, it seems the job's parallelism is indeed
>>>> increased, but then it runs into failures.
>>>>
>>>> The reason you run into the "Insufficient number of network buffers"
>>>> exception, is that with more tasks in your job, more inter-task data
>>>> transmission channels, thus memory for network buffers, are needed.
>>>>
>>>> To increase the network memory size, the following configuration
>>>> options, as you already found, are related.
>>>>
>>>>    - taskmanager.network.memory.fraction
>>>>    - taskmanager.network.memory.max
>>>>    - taskmanager.network.memory.min
>>>>
>>>> Please be aware that `taskmanager.memory.task.off-heap.size` is not
>>>> related to network memory, and is only available in Flink 1.10 and above
>>>> while you're using 1.9.1 as suggested by the screenshots.
>>>>
>>>> The network memory size is calculated as `min(max(some_total_value *
>>>> network_fraction, network_min), network_max)`. According to the error
>>>> message, your current network memory size is `85922 buffers *
>>>> 32KB/buffer = 2685MB`, smaller than your "max" (4gb). That means
>>>> increasing the "max" does not help in your case. It is the "fraction" that
>>>> you need to increase.
>>>>
>>>> Thank you~
>>>>
>>>> Xintong Song
>>>>
>>>>
>>>>
>>>> On Thu, May 28, 2020 at 9:30 AM Vijay Balakrishnan <bv...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Xintong,
>>>>> Looks like the issue is not fully resolved :( Attaching 2 screenshots
>>>>> of the memory consumption of 1 of the TaskManagers.
>>>>>
>>>>> To increase the used up Direct memory off heap,Do I change this:
>>>>>  taskmanager.memory.task.off-heap.size: 5gb
>>>>>
>>>>> I had increased the taskmanager.network.memory.max: 24gb
>>>>> which seems excessive.
>>>>>
>>>>> 1 of the errors I saw in the Flink logs:
>>>>>
>>>>> java.io.IOException: Insufficient number of network buffers: required
>>>>> 1, but only 0 available. The total number of network buffers is currently
>>>>> set to 85922 of 32768 bytes each. You can increase this number by setting
>>>>> the configuration keys 'taskmanager.network.memory.fraction',
>>>>> 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
>>>>> at
>>>>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:281)
>>>>> at
>>>>> org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:191)
>>>>>
>>>>> TIA,
>>>>>
>>>>>
>>>>> On Wed, May 27, 2020 at 9:06 AM Vijay Balakrishnan <bv...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks so much, Xintong for guiding me through this. I looked at the
>>>>>> Flink logs to see the errors.
>>>>>> I had to change taskmanager.network.memory.max: 4gb
>>>>>> and akka.ask.timeout: 240s to increase the number of tasks.
>>>>>> Now, I am able to increase the number of Tasks/ aka Task vertices.
>>>>>>
>>>>>> taskmanager.network.memory.fraction: 0.15
>>>>>> taskmanager.network.memory.max: 4gb
>>>>>> taskmanager.network.memory.min: 500mb
>>>>>> akka.ask.timeout: 240s
>>>>>>
>>>>>> On Tue, May 26, 2020 at 8:42 PM Xintong Song <to...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Could you also explain how do you set the parallelism when getting
>>>>>>> this execution plan?
>>>>>>> I'm asking because this json file itself only shows the resulted
>>>>>>> execution plan. It is not clear to me what is not working as expected in
>>>>>>> your case. E.g., you set the parallelism for an operator to 10 but the
>>>>>>> execution plan only shows 5.
>>>>>>>
>>>>>>> Thank you~
>>>>>>>
>>>>>>> Xintong Song
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 27, 2020 at 3:16 AM Vijay Balakrishnan <
>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Xintong,
>>>>>>>> Thanks for the excellent clarification for tasks.
>>>>>>>>
>>>>>>>> I attached a sample screenshot above and din't reflect the slots
>>>>>>>> used and the tasks limit I was running into in that pic.
>>>>>>>>
>>>>>>>> I am attaching my Execution plan here. Please let me know how I can
>>>>>>>> increase the nmber of tasks aka parallelism. As  increase the parallelism,
>>>>>>>> i run into this bottleneck with the tasks.
>>>>>>>>
>>>>>>>> BTW - The https://flink.apache.org/visualizer/ is a great start to
>>>>>>>> see this.
>>>>>>>> TIA,
>>>>>>>>
>>>>>>>> On Sun, May 24, 2020 at 7:52 PM Xintong Song <to...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Increasing network memory buffers (fraction, min, max) seems to
>>>>>>>>>> increase tasks slightly.
>>>>>>>>>
>>>>>>>>> That's wired. I don't think the number of network memory buffers
>>>>>>>>> have anything to do with the task amount.
>>>>>>>>>
>>>>>>>>> Let me try to clarify a few things.
>>>>>>>>>
>>>>>>>>> Please be aware that, how many tasks a Flink job has, and how many
>>>>>>>>> slots a Flink cluster has, are two different things.
>>>>>>>>> - The number of tasks are decided by your job's parallelism and
>>>>>>>>> topology. E.g., if your job graph have 3 vertices A, B and C, with
>>>>>>>>> parallelism 2, 3, 4 respectively. Then you would have totally 9 (2+3+4)
>>>>>>>>> tasks.
>>>>>>>>> - The number of slots are decided by number of TMs and
>>>>>>>>> slots-per-TM.
>>>>>>>>> - For streaming jobs, you have to make sure the number of slots is
>>>>>>>>> enough for executing all your tasks. The number of slots needed for
>>>>>>>>> executing your job is by default the max parallelism of your job graph
>>>>>>>>> vertices. Take the above example, you would need 4 slots, because it's the
>>>>>>>>> max among all the vertices' parallelisms (2, 3, 4).
>>>>>>>>>
>>>>>>>>> In your case, the screenshot shows that you job has 9621 tasks in
>>>>>>>>> total (not around 18000, the dark box shows total tasks while the green box
>>>>>>>>> shows running tasks), and 600 slots are in use (658 - 58) suggesting that
>>>>>>>>> the max parallelism of your job graph vertices is 600.
>>>>>>>>>
>>>>>>>>> If you want to increase the number of tasks, you should increase
>>>>>>>>> your job parallelism. There are several ways to do that.
>>>>>>>>>
>>>>>>>>>    - In your job codes (assuming you are using DataStream API)
>>>>>>>>>       - Use `StreamExecutionEnvironment#setParallelism()` to set
>>>>>>>>>       parallelism for all operators.
>>>>>>>>>       - Use `SingleOutputStreamOperator#setParallelism()` to set
>>>>>>>>>       parallelism for a specific operator. (Only supported for subclasses of
>>>>>>>>>       `SingleOutputStreamOperator`.)
>>>>>>>>>    - When submitting your job, use `-p <parallelism>` as an
>>>>>>>>>    argument for the `flink run` command, to set parallelism for all operators.
>>>>>>>>>    - Set `parallelism.default` in your `flink-conf.yaml`, to set
>>>>>>>>>    a default parallelism for your jobs. This will be used for jobs that have
>>>>>>>>>    not set parallelism with neither of the above methods.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thank you~
>>>>>>>>>
>>>>>>>>> Xintong Song
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, May 23, 2020 at 1:11 AM Vijay Balakrishnan <
>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Xintong,
>>>>>>>>>> Thx for your reply.  Increasing network memory buffers
>>>>>>>>>> (fraction, min, max) seems to increase tasks slightly.
>>>>>>>>>>
>>>>>>>>>> Streaming job
>>>>>>>>>> Standalone
>>>>>>>>>>
>>>>>>>>>> Vijay
>>>>>>>>>>
>>>>>>>>>> On Fri, May 22, 2020 at 2:49 AM Xintong Song <
>>>>>>>>>> tonysong820@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vijay,
>>>>>>>>>>>
>>>>>>>>>>> I don't think your problem is related to number of opening
>>>>>>>>>>> files. The parallelism of your job is decided before actually tries to open
>>>>>>>>>>> the files. And if the OS limit for opening files is reached, you should see
>>>>>>>>>>> a job execution failure, instead of a success execution with a lower
>>>>>>>>>>> parallelism.
>>>>>>>>>>>
>>>>>>>>>>> Could you share some more information about your use case?
>>>>>>>>>>>
>>>>>>>>>>>    - What kind of job are your executing? Is it a streaming or
>>>>>>>>>>>    batch processing job?
>>>>>>>>>>>    - Which Flink deployment do you use? Standalone? Yarn?
>>>>>>>>>>>    - It would be helpful if you can share the Flink logs.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thank you~
>>>>>>>>>>>
>>>>>>>>>>> Xintong Song
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, May 20, 2020 at 11:50 PM Vijay Balakrishnan <
>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> I have increased the number of slots available but the Job is
>>>>>>>>>>>> not using all the slots but runs into this approximate 18000 Tasks limit.
>>>>>>>>>>>> Looking into the source code, it seems to be opening file -
>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java#L203
>>>>>>>>>>>> So, do I have to tune the ulimit or something similar at the
>>>>>>>>>>>> Ubuntu O/S level to increase number of tasks available ? What I am confused
>>>>>>>>>>>> about is the ulimit is per machine but the ExecutionGraph is across many
>>>>>>>>>>>> machines ? Please pardon my ignorance here. Does number of tasks equate to
>>>>>>>>>>>> number of open files. I am using 15 slots per TaskManager on AWS m5.4xlarge
>>>>>>>>>>>> which has 16 vCPUs.
>>>>>>>>>>>>
>>>>>>>>>>>> TIA.
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 19, 2020 at 3:22 PM Vijay Balakrishnan <
>>>>>>>>>>>> bvijaykr@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Flink Dashboard UI seems to show tasks having a hard limit for
>>>>>>>>>>>>> Tasks column around 18000 on a Ubuntu Linux box.
>>>>>>>>>>>>> I kept increasing the number of slots per task manager to 15
>>>>>>>>>>>>> and number of slots increased to 705 but the slots to tasks
>>>>>>>>>>>>> stayed at around 18000. Below 18000 tasks, the Flink Job is
>>>>>>>>>>>>> able to start up.
>>>>>>>>>>>>> Even though I increased the number of slots, it still works
>>>>>>>>>>>>> when 312 slots are being used.
>>>>>>>>>>>>>
>>>>>>>>>>>>> taskmanager.numberOfTaskSlots: 15
>>>>>>>>>>>>>
>>>>>>>>>>>>> What knob can I tune to increase the number of Tasks ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Pls find attached the Flink Dashboard UI.
>>>>>>>>>>>>>
>>>>>>>>>>>>> TIA,
>>>>>>>>>>>>>
>>>>>>>>>>>>>