You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kye Bae <ky...@capitalone.com> on 2020/12/07 22:08:55 UTC

ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Hello!

We have a real-time streaming workflow that has been running for about 2.5
weeks.

Then, we began to get the exception below from taskmanagers (random) since
yesterday, and the job began to fail/restart every hour or so.

The job does recover after each restart, but sometimes it takes more time
to recover than allowed in our environment. On a few occasions, it took
more than a few restarts to fully recover.

Can you provide some insight into what this error means and also what we
can do to prevent this in future?

Thank you!

+++
ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
Encountered error while consuming partitions
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at org.apache.flink.shaded.netty4.io
.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
at org.apache.flink.shaded.netty4.io
.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io
.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io
.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.




Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Posted by Piotr Nowojski <pn...@apache.org>.
Hi,

At the first glance I can not find anything wrong with those settings. If
it was some memory configuration problem that caused this error, I guess it
would be visible as an exception somewhere. It's unlikely a GC issue, as if
some machine froze and stopped responding for a longer period of time, I
think it wouldn't cause "connection reset by peer" on the other end. But
have you tried looking into this?

*Have you enabled GC stats logging and checked if there are no problems?
Also have you looked at the stdout/stderr of the TaskManagers and system
logs?*

Are you using a standalone cluster to which you are submitting a job (1)?
Or are you using per job cluster (spawning TaskManagers on demand when
submitting a job) (2)? In both cases you can also carefully track the
JobManager log and try to find if there is some other exception, error or
some connection lost message from some OTHER TaskManager (different then
the one that threw "connection reset by peer"). But in case of (2), such
kind of connection loss might not be detected by JobManager in time, before
the cluster is shut down (due to detection of the "connection reset by
peer" error).

If you can not find anything suspicious (errors, some longer periods of
inactivity) in JobManager logs (a) or other TM logs around the timestamp
when "connection reset by peer" has occurred, and all of the other TM were
still working correctly after the "connection reset by peer" happened, it
would mean a couple of things:
- it's probably not problem with the Flink itself, but rather with the
environment (unstable network) - you could try searching for solutions how
to deal with "connection reset by peer" in general, not necessarily just in
the Flink context
- it wasn't a total machine/TaskManager lost, it was rather single network
connection lost between just two TaskManagers

Best,
Piotrek

śr., 9 gru 2020 o 00:03 Kye Bae <ky...@capitalone.com> napisał(a):

> Hello, Piotr.
>
> Thanks again for your continued support.
>
> We did look through the log files from the other taskmanagers for any
> additional errors at or around the time of the said exception but haven't
> found any other errors.
>
> However, we do see INFO-level "Direct memory stats" like this one (Used
> Memory is 2 bytes greater than Total Capacity, but there are no errors
> around it):
>
> Direct memory stats: Count: 32968, Total Capacity: 1104834342, Used Memory:
> 1104834344
>
>
>
> Could this be an indication of a problem? Or is this normal? We see no
> other errors, memory-related or otherwise around these entries.
>
>
> Based on the Flink 1.10 memory model attached below, we have these Flink
> taskmanager memory options (16 GB total physical memory for each
> taskmanager). I am not aware of a Flink option to set the "direct memory"
> for Flink taskmanagers, and I don't know how Flink derives the total
> capacity for it or if ~1 GB is appropriate.
>
>
>
>         taskmanager.memory.task.off-heap.size: 1536m
>
>         taskmanager.memory.managed.size: 3g
>
>         taskmanager.memory.task.heap.size: 6g
>
>         taskmanager.memory.jvm-metaspace.size: 1536m
>
>         taskmanager.memory.jvm-overhead.max: 2816m
>
> On Tue, Dec 8, 2020 at 3:57 PM Piotr Nowojski <pn...@apache.org>
> wrote:
>
>> Hi,
>>
>> This exception looks like it was thrown by a downstream Task/TaskManager
>> when trying to read a message/packet from some upstream Task/TaskManager
>> and that connection between two TaskManagers was reseted (closed abruptly).
>> So it's the case:
>> > involves communicating with other non-collocated tasks running on other
>> taskmanagers
>>
>> Piotrek
>>
>> wt., 8 gru 2020 o 18:56 Kye Bae <ky...@capitalone.com> napisał(a):
>>
>>> Hello, Piotr.
>>>
>>> Thank you.
>>>
>>> This is an error logged to the taskmanager just before it became "lost"
>>> to the jobmanager (i.e., reported as "lost" in the jobmanager log just
>>> before the job restart). In what context would this particular error (not
>>> the root-root cause you referred to) be thrown from a taskmanager? E.g.,
>>> any point in the pipeline that involves communicating with other
>>> non-collocated tasks running on other taskmanagers? Or with the jobmanager?
>>>
>>> -K
>>>
>>> On Tue, Dec 8, 2020 at 3:19 AM Piotr Nowojski <pn...@apache.org>
>>> wrote:
>>>
>>>> Hi Kye,
>>>>
>>>> Almost for sure this error is not the primary cause of the failure.
>>>> This error means that the node reporting it, has detected some fatal
>>>> failure on the other side of the wire (connection reset by peer), but the
>>>> original error is somehow too slow or unable to propagate to the JobManager
>>>> before this secondary exception. Something else must have
>>>> failed/crashed/caused, so you should look for that something. This
>>>> something can be:
>>>> 1. TaskManager on the other end has crashed with some error - please
>>>> look for some errors or warning in other task managers logs
>>>> 2. OOM or some other JVM failure - again please look at the logs on
>>>> other machines (maybe system logs)
>>>> 3. Some OS failure - please look at the system logs on other machines
>>>> 4. Some hardware failure (restart / crash)
>>>> 5. Network problems
>>>>
>>>> Piotrek
>>>>
>>>> pon., 7 gru 2020 o 23:31 Kye Bae <ky...@capitalone.com> napisał(a):
>>>>
>>>>> I forgot to mention: this is Flink 1.10.
>>>>>
>>>>> -K
>>>>>
>>>>> On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <ky...@capitalone.com> wrote:
>>>>>
>>>>>> Hello!
>>>>>>
>>>>>> We have a real-time streaming workflow that has been running for
>>>>>> about 2.5 weeks.
>>>>>>
>>>>>> Then, we began to get the exception below from taskmanagers (random)
>>>>>> since yesterday, and the job began to fail/restart every hour or so.
>>>>>>
>>>>>> The job does recover after each restart, but sometimes it takes more
>>>>>> time to recover than allowed in our environment. On a few occasions, it
>>>>>> took more than a few restarts to fully recover.
>>>>>>
>>>>>> Can you provide some insight into what this error means and also what
>>>>>> we can do to prevent this in future?
>>>>>>
>>>>>> Thank you!
>>>>>>
>>>>>> +++
>>>>>> ERROR
>>>>>> org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>>>>>> Encountered error while consuming partitions
>>>>>> java.io
>>>>>> <https://urldefense.com/v3/__http://java.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_LEtbBVlg$>.IOException:
>>>>>> Connection reset by peer
>>>>>> at sun.nio.ch
>>>>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>.FileDispatcherImpl.read0(Native
>>>>>> Method)
>>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>>> at sun.nio.ch
>>>>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>
>>>>>> .IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>>>>> at org.apache.flink.shaded.netty4.io
>>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>>> .netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
>>>>>> at org.apache.flink.shaded.netty4.io
>>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>>> .netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>>>>>> at
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>>>>>> at org.apache.flink.shaded.netty4.io
>>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>>> .netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>>>>>> at org.apache.flink.shaded.netty4.io
>>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>>> .netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>> ------------------------------
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ------------------------------
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>
>>>
>>>
>>>
>>> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>

Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Posted by Kye Bae <ky...@capitalone.com>.
Hello, Piotr.

Thanks again for your continued support.

We did look through the log files from the other taskmanagers for any
additional errors at or around the time of the said exception but haven't
found any other errors.

However, we do see INFO-level "Direct memory stats" like this one (Used
Memory is 2 bytes greater than Total Capacity, but there are no errors
around it):

Direct memory stats: Count: 32968, Total Capacity: 1104834342, Used Memory:
1104834344



Could this be an indication of a problem? Or is this normal? We see no
other errors, memory-related or otherwise around these entries.


Based on the Flink 1.10 memory model attached below, we have these Flink
taskmanager memory options (16 GB total physical memory for each
taskmanager). I am not aware of a Flink option to set the "direct memory"
for Flink taskmanagers, and I don't know how Flink derives the total
capacity for it or if ~1 GB is appropriate.



        taskmanager.memory.task.off-heap.size: 1536m

        taskmanager.memory.managed.size: 3g

        taskmanager.memory.task.heap.size: 6g

        taskmanager.memory.jvm-metaspace.size: 1536m

        taskmanager.memory.jvm-overhead.max: 2816m

On Tue, Dec 8, 2020 at 3:57 PM Piotr Nowojski <pn...@apache.org> wrote:

> Hi,
>
> This exception looks like it was thrown by a downstream Task/TaskManager
> when trying to read a message/packet from some upstream Task/TaskManager
> and that connection between two TaskManagers was reseted (closed abruptly).
> So it's the case:
> > involves communicating with other non-collocated tasks running on other
> taskmanagers
>
> Piotrek
>
> wt., 8 gru 2020 o 18:56 Kye Bae <ky...@capitalone.com> napisał(a):
>
>> Hello, Piotr.
>>
>> Thank you.
>>
>> This is an error logged to the taskmanager just before it became "lost"
>> to the jobmanager (i.e., reported as "lost" in the jobmanager log just
>> before the job restart). In what context would this particular error (not
>> the root-root cause you referred to) be thrown from a taskmanager? E.g.,
>> any point in the pipeline that involves communicating with other
>> non-collocated tasks running on other taskmanagers? Or with the jobmanager?
>>
>> -K
>>
>> On Tue, Dec 8, 2020 at 3:19 AM Piotr Nowojski <pn...@apache.org>
>> wrote:
>>
>>> Hi Kye,
>>>
>>> Almost for sure this error is not the primary cause of the failure. This
>>> error means that the node reporting it, has detected some fatal failure on
>>> the other side of the wire (connection reset by peer), but the original
>>> error is somehow too slow or unable to propagate to the JobManager before
>>> this secondary exception. Something else must have failed/crashed/caused,
>>> so you should look for that something. This something can be:
>>> 1. TaskManager on the other end has crashed with some error - please
>>> look for some errors or warning in other task managers logs
>>> 2. OOM or some other JVM failure - again please look at the logs on
>>> other machines (maybe system logs)
>>> 3. Some OS failure - please look at the system logs on other machines
>>> 4. Some hardware failure (restart / crash)
>>> 5. Network problems
>>>
>>> Piotrek
>>>
>>> pon., 7 gru 2020 o 23:31 Kye Bae <ky...@capitalone.com> napisał(a):
>>>
>>>> I forgot to mention: this is Flink 1.10.
>>>>
>>>> -K
>>>>
>>>> On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <ky...@capitalone.com> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> We have a real-time streaming workflow that has been running for about
>>>>> 2.5 weeks.
>>>>>
>>>>> Then, we began to get the exception below from taskmanagers (random)
>>>>> since yesterday, and the job began to fail/restart every hour or so.
>>>>>
>>>>> The job does recover after each restart, but sometimes it takes more
>>>>> time to recover than allowed in our environment. On a few occasions, it
>>>>> took more than a few restarts to fully recover.
>>>>>
>>>>> Can you provide some insight into what this error means and also what
>>>>> we can do to prevent this in future?
>>>>>
>>>>> Thank you!
>>>>>
>>>>> +++
>>>>> ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>>>> - Encountered error while consuming partitions
>>>>> java.io
>>>>> <https://urldefense.com/v3/__http://java.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_LEtbBVlg$>.IOException:
>>>>> Connection reset by peer
>>>>> at sun.nio.ch
>>>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>.FileDispatcherImpl.read0(Native
>>>>> Method)
>>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>>> at sun.nio.ch
>>>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>
>>>>> .IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>>>> at org.apache.flink.shaded.netty4.io
>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>> .netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
>>>>> at org.apache.flink.shaded.netty4.io
>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>> .netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>>>>> at
>>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>>>>> at org.apache.flink.shaded.netty4.io
>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>> .netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>>>>> at org.apache.flink.shaded.netty4.io
>>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>>> .netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>> ------------------------------
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>
>>>>
>>>>
>>>>
>>>> ------------------------------
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>>
>>
>>

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.




Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Posted by Piotr Nowojski <pn...@apache.org>.
Hi,

This exception looks like it was thrown by a downstream Task/TaskManager
when trying to read a message/packet from some upstream Task/TaskManager
and that connection between two TaskManagers was reseted (closed abruptly).
So it's the case:
> involves communicating with other non-collocated tasks running on other
taskmanagers

Piotrek

wt., 8 gru 2020 o 18:56 Kye Bae <ky...@capitalone.com> napisał(a):

> Hello, Piotr.
>
> Thank you.
>
> This is an error logged to the taskmanager just before it became "lost" to
> the jobmanager (i.e., reported as "lost" in the jobmanager log just before
> the job restart). In what context would this particular error (not the
> root-root cause you referred to) be thrown from a taskmanager? E.g., any
> point in the pipeline that involves communicating with other non-collocated
> tasks running on other taskmanagers? Or with the jobmanager?
>
> -K
>
> On Tue, Dec 8, 2020 at 3:19 AM Piotr Nowojski <pn...@apache.org>
> wrote:
>
>> Hi Kye,
>>
>> Almost for sure this error is not the primary cause of the failure. This
>> error means that the node reporting it, has detected some fatal failure on
>> the other side of the wire (connection reset by peer), but the original
>> error is somehow too slow or unable to propagate to the JobManager before
>> this secondary exception. Something else must have failed/crashed/caused,
>> so you should look for that something. This something can be:
>> 1. TaskManager on the other end has crashed with some error - please look
>> for some errors or warning in other task managers logs
>> 2. OOM or some other JVM failure - again please look at the logs on other
>> machines (maybe system logs)
>> 3. Some OS failure - please look at the system logs on other machines
>> 4. Some hardware failure (restart / crash)
>> 5. Network problems
>>
>> Piotrek
>>
>> pon., 7 gru 2020 o 23:31 Kye Bae <ky...@capitalone.com> napisał(a):
>>
>>> I forgot to mention: this is Flink 1.10.
>>>
>>> -K
>>>
>>> On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <ky...@capitalone.com> wrote:
>>>
>>>> Hello!
>>>>
>>>> We have a real-time streaming workflow that has been running for about
>>>> 2.5 weeks.
>>>>
>>>> Then, we began to get the exception below from taskmanagers (random)
>>>> since yesterday, and the job began to fail/restart every hour or so.
>>>>
>>>> The job does recover after each restart, but sometimes it takes more
>>>> time to recover than allowed in our environment. On a few occasions, it
>>>> took more than a few restarts to fully recover.
>>>>
>>>> Can you provide some insight into what this error means and also what
>>>> we can do to prevent this in future?
>>>>
>>>> Thank you!
>>>>
>>>> +++
>>>> ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue
>>>> - Encountered error while consuming partitions
>>>> java.io
>>>> <https://urldefense.com/v3/__http://java.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_LEtbBVlg$>.IOException:
>>>> Connection reset by peer
>>>> at sun.nio.ch
>>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>.FileDispatcherImpl.read0(Native
>>>> Method)
>>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>>> at sun.nio.ch
>>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>
>>>> .IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>>> at org.apache.flink.shaded.netty4.io
>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>> .netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
>>>> at org.apache.flink.shaded.netty4.io
>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>> .netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>>>> at
>>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>>>> at org.apache.flink.shaded.netty4.io
>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>> .netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>>>> at org.apache.flink.shaded.netty4.io
>>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>>> .netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>>
>>> ------------------------------
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>
>>>
>>>
>>>
>>> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>

Re: [External Sender] Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Posted by Kye Bae <ky...@capitalone.com>.
Hello, Piotr.

Thank you.

This is an error logged to the taskmanager just before it became "lost" to
the jobmanager (i.e., reported as "lost" in the jobmanager log just before
the job restart). In what context would this particular error (not the
root-root cause you referred to) be thrown from a taskmanager? E.g., any
point in the pipeline that involves communicating with other non-collocated
tasks running on other taskmanagers? Or with the jobmanager?

-K

On Tue, Dec 8, 2020 at 3:19 AM Piotr Nowojski <pn...@apache.org> wrote:

> Hi Kye,
>
> Almost for sure this error is not the primary cause of the failure. This
> error means that the node reporting it, has detected some fatal failure on
> the other side of the wire (connection reset by peer), but the original
> error is somehow too slow or unable to propagate to the JobManager before
> this secondary exception. Something else must have failed/crashed/caused,
> so you should look for that something. This something can be:
> 1. TaskManager on the other end has crashed with some error - please look
> for some errors or warning in other task managers logs
> 2. OOM or some other JVM failure - again please look at the logs on other
> machines (maybe system logs)
> 3. Some OS failure - please look at the system logs on other machines
> 4. Some hardware failure (restart / crash)
> 5. Network problems
>
> Piotrek
>
> pon., 7 gru 2020 o 23:31 Kye Bae <ky...@capitalone.com> napisał(a):
>
>> I forgot to mention: this is Flink 1.10.
>>
>> -K
>>
>> On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <ky...@capitalone.com> wrote:
>>
>>> Hello!
>>>
>>> We have a real-time streaming workflow that has been running for about
>>> 2.5 weeks.
>>>
>>> Then, we began to get the exception below from taskmanagers (random)
>>> since yesterday, and the job began to fail/restart every hour or so.
>>>
>>> The job does recover after each restart, but sometimes it takes more
>>> time to recover than allowed in our environment. On a few occasions, it
>>> took more than a few restarts to fully recover.
>>>
>>> Can you provide some insight into what this error means and also what we
>>> can do to prevent this in future?
>>>
>>> Thank you!
>>>
>>> +++
>>> ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>>> Encountered error while consuming partitions
>>> java.io
>>> <https://urldefense.com/v3/__http://java.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_LEtbBVlg$>.IOException:
>>> Connection reset by peer
>>> at sun.nio.ch
>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>.FileDispatcherImpl.read0(Native
>>> Method)
>>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>>> at sun.nio.ch
>>> <https://urldefense.com/v3/__http://sun.nio.ch/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_Lj-CBwHw$>
>>> .IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>>> at org.apache.flink.shaded.netty4.io
>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>> .netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
>>> at org.apache.flink.shaded.netty4.io
>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>> .netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>>> at org.apache.flink.shaded.netty4.io
>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>> .netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>>> at org.apache.flink.shaded.netty4.io
>>> <https://urldefense.com/v3/__http://org.apache.flink.shaded.netty4.io/__;!!EFVe01R3CjU!NUoIha4XyuOfu-V-wni1kiKiIyjjXaprElbqdFKZPNj5SkiDttNIjMbEg_KrMQo4YQ$>
>>> .netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>> ------------------------------
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>>
>>
>>

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.




Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Posted by Piotr Nowojski <pn...@apache.org>.
Hi Kye,

Almost for sure this error is not the primary cause of the failure. This
error means that the node reporting it, has detected some fatal failure on
the other side of the wire (connection reset by peer), but the original
error is somehow too slow or unable to propagate to the JobManager before
this secondary exception. Something else must have failed/crashed/caused,
so you should look for that something. This something can be:
1. TaskManager on the other end has crashed with some error - please look
for some errors or warning in other task managers logs
2. OOM or some other JVM failure - again please look at the logs on other
machines (maybe system logs)
3. Some OS failure - please look at the system logs on other machines
4. Some hardware failure (restart / crash)
5. Network problems

Piotrek

pon., 7 gru 2020 o 23:31 Kye Bae <ky...@capitalone.com> napisał(a):

> I forgot to mention: this is Flink 1.10.
>
> -K
>
> On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <ky...@capitalone.com> wrote:
>
>> Hello!
>>
>> We have a real-time streaming workflow that has been running for about
>> 2.5 weeks.
>>
>> Then, we began to get the exception below from taskmanagers (random)
>> since yesterday, and the job began to fail/restart every hour or so.
>>
>> The job does recover after each restart, but sometimes it takes more time
>> to recover than allowed in our environment. On a few occasions, it took
>> more than a few restarts to fully recover.
>>
>> Can you provide some insight into what this error means and also what we
>> can do to prevent this in future?
>>
>> Thank you!
>>
>> +++
>> ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
>> Encountered error while consuming partitions
>> java.io.IOException: Connection reset by peer
>> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>> at org.apache.flink.shaded.netty4.io
>> .netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
>> at org.apache.flink.shaded.netty4.io
>> .netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
>> at org.apache.flink.shaded.netty4.io
>> .netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>> at org.apache.flink.shaded.netty4.io
>> .netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.lang.Thread.run(Thread.java:748)
>>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>

Re: ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue

Posted by Kye Bae <ky...@capitalone.com>.
I forgot to mention: this is Flink 1.10.

-K

On Mon, Dec 7, 2020 at 5:08 PM Kye Bae <ky...@capitalone.com> wrote:

> Hello!
>
> We have a real-time streaming workflow that has been running for about 2.5
> weeks.
>
> Then, we began to get the exception below from taskmanagers (random) since
> yesterday, and the job began to fail/restart every hour or so.
>
> The job does recover after each restart, but sometimes it takes more time
> to recover than allowed in our environment. On a few occasions, it took
> more than a few restarts to fully recover.
>
> Can you provide some insight into what this error means and also what we
> can do to prevent this in future?
>
> Thank you!
>
> +++
> ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue  -
> Encountered error while consuming partitions
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> at org.apache.flink.shaded.netty4.io
> .netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
> at org.apache.flink.shaded.netty4.io
> .netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1140)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
> at org.apache.flink.shaded.netty4.io
> .netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
> at org.apache.flink.shaded.netty4.io
> .netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.lang.Thread.run(Thread.java:748)
>

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.