You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Kevin Lam <ke...@shopify.com> on 2021/08/17 18:55:17 UTC

Task Managers having trouble registering after restart

Hi all,

I'm observing an issue sometimes, and it's been hard to reproduce, where
task managers are not able to register with the Flink cluster. We provision
only the number of task managers required to run a given application, and
so the absence of any of the task managers causes the job to enter a crash
loop where it fails to get the required task slots.

The failure occurs after a job has been running for a while, and when there
have been job and task manager restarts. We run in kubernetes so pod
disruptions occur from time to time, however we're running using the high
availability setup [0]

Has anyone encountered this before? Any suggestions?

Below are some error messages pulled from the task managers failing to
re-register.

```
] - Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,112 INFO
 org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Starting DefaultLeaderElectionService with
KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-restserver-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-resourcemanager-leader.
2021-08-16 13:15:10,205 INFO
 org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
[] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
streaming-sales-model-staging-dispatcher-leader.
2021-08-16 13:15:10,211 INFO
 org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
- Starting DefaultLeaderRetrievalService with
KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
2021-08-16 13:16:26,103 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
2021-08-16 13:16:30,978 WARN
 org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
- Error while retrieving the leader gateway. Retrying to connect to
akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
```

```
2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
               [] - Uncaught exception in thread
'kafka-producer-network-thread |
trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
        at
org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
~[?:?]
        at
org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
~[?:?]
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
        at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
~[?:?]
        at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
~[?:?]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.clients.NetworkClient$1
        at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
        at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
        at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        ... 6 more
```

```
connection to [null] failed with java.net.ConnectException: Connection
refused: flink-jobmanager/10.28.65.100:6123
2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
                [] - Association with remote system
[akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
for [50] ms. Reason: [Association failed with
[akka.tcp://flink@flink-jobmanager:6123]] Caused by:
[java.net.ConnectException: Connection refused: flink-jobmanager/
10.28.65.100:6123]
2021-08-16 13:14:59,669 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
retrying in 10000 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
```

```
2021-08-15 16:55:13,222 ERROR
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
[] - Failed to submit a listener notification task. Event loop shut down?
java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
in user code or some of its dependencies which has to be investigated and
fixed. The task executor has to be shutdown...
```


[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes

Re: Task Managers having trouble registering after restart

Posted by Kevin Lam <ke...@shopify.com>.
I was able to bypass the above error, and am now hitting:

```
The main method caused an error: The implementation of the
FlinkKafkaConsumer is not serializable. The object probably contains or
references non serializable fields.
```

Which is surprising since we don’t think the way to load FlinkKafkaConsumer
class affects whether it would be serialized or not. Any ideas?

Separately is there any movement ongoing to fix the underlying class
loading leak in the Kafka Metrics code?

On Tue, Aug 24, 2021 at 10:38 AM Kevin Lam <ke...@shopify.com> wrote:

> Thanks Chesnay! I'll take a look.
>
> I do see kafka metrics in a heapdump:
>
> [image: image.png]
>
> i was trying to avoid dynamic class-loading altogether, since we run
> single job clusters, by putting the job jar into /lib, but I hit this error:
>
> ```
> Could not find the provided job class (<our job class>) in the user lib
> directory (/opt/flink/usrlib).
> ```
>
> Any ideas on Avoiding Dynamic Classloading for User Code
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code>?
> Am I missing some configuration?
>
>
> On Tue, Aug 24, 2021 at 9:50 AM Chesnay Schepler <ch...@apache.org>
> wrote:
>
>> There's a super rough guide in the wiki:
>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>
>> The gist of it is that you first want to verify that a
>> ChildFirstClassLoader is being leaked (i.e., run a few jobs, cancel them,
>> trigger garbage collection, get heap dump, check that
>> ChildFirstClassLoaders are still there), then investigate which GC roots
>> (aka threads) are referencing that classloader, then figure out where those
>> threads came from.
>>
>> On 24/08/2021 15:43, Kevin Lam wrote:
>>
>> Thank you for pulling in Chesnay.
>>
>> I haven't been able to confirm the issue doesn't happen yet, as I've
>> found it difficult to reproduce easily. I did have follow-up questions:
>>
>> 1/ If Kafka metrics are indeed the cause of the leak, is there a
>> workaround? We'd be interested in having these metrics available for
>> monitoring and alerting purposes.
>>
>> 2/ Do you have any tips on identifying/confirming where the leak is
>> coming from?
>>
>>
>>
>> On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Kevin,
>>>
>>> The metrics are exposed similarly, so I expect the same issues as they
>>> come from Kafka's Consumer API itself.
>>>
>>> I'll pull in @Chesnay Schepler <ch...@apache.org> who afaik debugged
>>> the leak a while ago.
>>>
>>> On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <ke...@shopify.com> wrote:
>>>
>>>> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
>>>> `KafkaSource`. Is there a way to disable the consumer metrics using
>>>> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>>>>
>>>>
>>>> [0]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>>>>
>>>> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <ke...@shopify.com>
>>>> wrote:
>>>>
>>>>> Thanks Arvid! I will give this a try and report back.
>>>>>
>>>>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi Kevin,
>>>>>>
>>>>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many
>>>>>> classes have been loaded. [1]
>>>>>> If you only see that after a while, it's indicating that there is a
>>>>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>>>>> have been some reports in the past.
>>>>>> You can try to see what happens when you disable the forwarding of
>>>>>> the Kafka metrics with register.consumer.metrics: false [2].
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>>>>> [2]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>>>>
>>>>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I'm observing an issue sometimes, and it's been hard to reproduce,
>>>>>>> where task managers are not able to register with the Flink cluster. We
>>>>>>> provision only the number of task managers required to run a given
>>>>>>> application, and so the absence of any of the task managers causes the job
>>>>>>> to enter a crash loop where it fails to get the required task slots.
>>>>>>>
>>>>>>> The failure occurs after a job has been running for a while, and
>>>>>>> when there have been job and task manager restarts. We run in kubernetes so
>>>>>>> pod disruptions occur from time to time, however we're running using the
>>>>>>> high availability setup [0]
>>>>>>>
>>>>>>> Has anyone encountered this before? Any suggestions?
>>>>>>>
>>>>>>> Below are some error messages pulled from the task managers failing
>>>>>>> to re-register.
>>>>>>>
>>>>>>> ```
>>>>>>> ] - Starting DefaultLeaderRetrievalService with
>>>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>>>> 2021-08-16 13:15:10,112 INFO
>>>>>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>>>>>> Starting DefaultLeaderElectionService with
>>>>>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>>>> streaming-sales-model-staging-restserver-leader.
>>>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>>>> streaming-sales-model-staging-resourcemanager-leader.
>>>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>>>> streaming-sales-model-staging-dispatcher-leader.
>>>>>>> 2021-08-16 13:15:10,211 INFO
>>>>>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>>>>>> - Starting DefaultLeaderRetrievalService with
>>>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>>>>>> 2021-08-16 13:16:26,103 WARN
>>>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>>>> 2021-08-16 13:16:30,978 WARN
>>>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>>>> ```
>>>>>>>
>>>>>>> ```
>>>>>>> 2021-08-15 14:02:21,078 ERROR
>>>>>>> org.apache.kafka.common.utils.KafkaThread                    [] - Uncaught
>>>>>>> exception in thread 'kafka-producer-network-thread |
>>>>>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>> org/apache/kafka/clients/NetworkClient$1
>>>>>>>         at
>>>>>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>>>>>> ~[?:?]
>>>>>>>         at
>>>>>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>>>>>> ~[?:?]
>>>>>>>         at
>>>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>>>>>>         at
>>>>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>>>>>> ~[?:?]
>>>>>>>         at
>>>>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>>>>> ~[?:?]
>>>>>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>> org.apache.kafka.clients.NetworkClient$1
>>>>>>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>>>         at
>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>>>         at
>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>>>         at
>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>>>         ... 6 more
>>>>>>> ```
>>>>>>>
>>>>>>> ```
>>>>>>> connection to [null] failed with java.net.ConnectException:
>>>>>>> Connection refused: flink-jobmanager/10.28.65.100:6123
>>>>>>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>>>                       [] - Association with remote system
>>>>>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>>>>>> gated for [50] ms. Reason: [Association failed with
>>>>>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>>>>>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>>>>>>> 10.28.65.100:6123]
>>>>>>> 2021-08-16 13:14:59,669 INFO
>>>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>>>>>>> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>>>>>>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>>>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>>>>>>> ```
>>>>>>>
>>>>>>> ```
>>>>>>> 2021-08-15 16:55:13,222 ERROR
>>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>>>> [] - Failed to submit a listener notification task. Event loop shut down?
>>>>>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory
>>>>>>> error has occurred. This can mean two things: either the job requires a
>>>>>>> larger size of JVM metaspace to load classes or there is a class loading
>>>>>>> leak. In the first case 'taskmanager.memory.jvm-metaspace.size'
>>>>>>> configuration option should be increased. If the error persists (usually in
>>>>>>> cluster after several job (re-)submissions) then there is probably a class
>>>>>>> loading leak in user code or some of its dependencies which has to be
>>>>>>> investigated and fixed. The task executor has to be shutdown...
>>>>>>> ```
>>>>>>>
>>>>>>>
>>>>>>> [0]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>>>>>>
>>>>>>
>>

Re: Task Managers having trouble registering after restart

Posted by Kevin Lam <ke...@shopify.com>.
Thanks Chesnay! I'll take a look.

I do see kafka metrics in a heapdump:

[image: image.png]

i was trying to avoid dynamic class-loading altogether, since we run single
job clusters, by putting the job jar into /lib, but I hit this error:

```
Could not find the provided job class (<our job class>) in the user lib
directory (/opt/flink/usrlib).
```

Any ideas on Avoiding Dynamic Classloading for User Code
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code>?
Am I missing some configuration?


On Tue, Aug 24, 2021 at 9:50 AM Chesnay Schepler <ch...@apache.org> wrote:

> There's a super rough guide in the wiki:
> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>
> The gist of it is that you first want to verify that a
> ChildFirstClassLoader is being leaked (i.e., run a few jobs, cancel them,
> trigger garbage collection, get heap dump, check that
> ChildFirstClassLoaders are still there), then investigate which GC roots
> (aka threads) are referencing that classloader, then figure out where those
> threads came from.
>
> On 24/08/2021 15:43, Kevin Lam wrote:
>
> Thank you for pulling in Chesnay.
>
> I haven't been able to confirm the issue doesn't happen yet, as I've found
> it difficult to reproduce easily. I did have follow-up questions:
>
> 1/ If Kafka metrics are indeed the cause of the leak, is there a
> workaround? We'd be interested in having these metrics available for
> monitoring and alerting purposes.
>
> 2/ Do you have any tips on identifying/confirming where the leak is coming
> from?
>
>
>
> On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> The metrics are exposed similarly, so I expect the same issues as they
>> come from Kafka's Consumer API itself.
>>
>> I'll pull in @Chesnay Schepler <ch...@apache.org> who afaik debugged
>> the leak a while ago.
>>
>> On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <ke...@shopify.com> wrote:
>>
>>> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
>>> `KafkaSource`. Is there a way to disable the consumer metrics using
>>> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>>>
>>>
>>> [0]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>>>
>>> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <ke...@shopify.com> wrote:
>>>
>>>> Thanks Arvid! I will give this a try and report back.
>>>>
>>>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:
>>>>
>>>>> Hi Kevin,
>>>>>
>>>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many
>>>>> classes have been loaded. [1]
>>>>> If you only see that after a while, it's indicating that there is a
>>>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>>>> have been some reports in the past.
>>>>> You can try to see what happens when you disable the forwarding of the
>>>>> Kafka metrics with register.consumer.metrics: false [2].
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>>>
>>>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm observing an issue sometimes, and it's been hard to reproduce,
>>>>>> where task managers are not able to register with the Flink cluster. We
>>>>>> provision only the number of task managers required to run a given
>>>>>> application, and so the absence of any of the task managers causes the job
>>>>>> to enter a crash loop where it fails to get the required task slots.
>>>>>>
>>>>>> The failure occurs after a job has been running for a while, and when
>>>>>> there have been job and task manager restarts. We run in kubernetes so pod
>>>>>> disruptions occur from time to time, however we're running using the high
>>>>>> availability setup [0]
>>>>>>
>>>>>> Has anyone encountered this before? Any suggestions?
>>>>>>
>>>>>> Below are some error messages pulled from the task managers failing
>>>>>> to re-register.
>>>>>>
>>>>>> ```
>>>>>> ] - Starting DefaultLeaderRetrievalService with
>>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>>> 2021-08-16 13:15:10,112 INFO
>>>>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>>>>> Starting DefaultLeaderElectionService with
>>>>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>>> streaming-sales-model-staging-restserver-leader.
>>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>>> streaming-sales-model-staging-resourcemanager-leader.
>>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>>> streaming-sales-model-staging-dispatcher-leader.
>>>>>> 2021-08-16 13:15:10,211 INFO
>>>>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>>>>> - Starting DefaultLeaderRetrievalService with
>>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>>>>> 2021-08-16 13:16:26,103 WARN
>>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>>> 2021-08-16 13:16:30,978 WARN
>>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>>> ```
>>>>>>
>>>>>> ```
>>>>>> 2021-08-15 14:02:21,078 ERROR
>>>>>> org.apache.kafka.common.utils.KafkaThread                    [] - Uncaught
>>>>>> exception in thread 'kafka-producer-network-thread |
>>>>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>>>>> java.lang.NoClassDefFoundError:
>>>>>> org/apache/kafka/clients/NetworkClient$1
>>>>>>         at
>>>>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>>>>> ~[?:?]
>>>>>>         at
>>>>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>>>>> ~[?:?]
>>>>>>         at
>>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>>>>> ~[?:?]
>>>>>>         at
>>>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>>>> ~[?:?]
>>>>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> org.apache.kafka.clients.NetworkClient$1
>>>>>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>>         at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>>         at
>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>>         at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>>         ... 6 more
>>>>>> ```
>>>>>>
>>>>>> ```
>>>>>> connection to [null] failed with java.net.ConnectException:
>>>>>> Connection refused: flink-jobmanager/10.28.65.100:6123
>>>>>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>>                       [] - Association with remote system
>>>>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>>>>> gated for [50] ms. Reason: [Association failed with
>>>>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>>>>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>>>>>> 10.28.65.100:6123]
>>>>>> 2021-08-16 13:14:59,669 INFO
>>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>>>>>> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>>>>>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>>>>>> ```
>>>>>>
>>>>>> ```
>>>>>> 2021-08-15 16:55:13,222 ERROR
>>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>>> [] - Failed to submit a listener notification task. Event loop shut down?
>>>>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory
>>>>>> error has occurred. This can mean two things: either the job requires a
>>>>>> larger size of JVM metaspace to load classes or there is a class loading
>>>>>> leak. In the first case 'taskmanager.memory.jvm-metaspace.size'
>>>>>> configuration option should be increased. If the error persists (usually in
>>>>>> cluster after several job (re-)submissions) then there is probably a class
>>>>>> loading leak in user code or some of its dependencies which has to be
>>>>>> investigated and fixed. The task executor has to be shutdown...
>>>>>> ```
>>>>>>
>>>>>>
>>>>>> [0]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>>>>>
>>>>>
>

Re: Task Managers having trouble registering after restart

Posted by Chesnay Schepler <ch...@apache.org>.
There's a super rough guide in the wiki: 
https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

The gist of it is that you first want to verify that a 
ChildFirstClassLoader is being leaked (i.e., run a few jobs, cancel 
them, trigger garbage collection, get heap dump, check that 
ChildFirstClassLoaders are still there), then investigate which GC roots 
(aka threads) are referencing that classloader, then figure out where 
those threads came from.

On 24/08/2021 15:43, Kevin Lam wrote:
> Thank you for pulling in Chesnay.
>
> I haven't been able to confirm the issue doesn't happen yet, as I've 
> found it difficult to reproduce easily. I did have follow-up questions:
>
> 1/ If Kafka metrics are indeed the cause of the leak, is there a 
> workaround? We'd be interested in having these metrics available for 
> monitoring and alerting purposes.
>
> 2/ Do you have any tips on identifying/confirming where the leak is 
> coming from?
>
>
>
> On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise <arvid@apache.org 
> <ma...@apache.org>> wrote:
>
>     Hi Kevin,
>
>     The metrics are exposed similarly, so I expect the same issues as
>     they come from Kafka's Consumer API itself.
>
>     I'll pull in @Chesnay Schepler <ma...@apache.org> who
>     afaik debugged the leak a while ago.
>
>     On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <kevin.lam@shopify.com
>     <ma...@shopify.com>> wrote:
>
>         Actually, we are using the `FlinkKafkaConsumer` [0] rather
>         than `KafkaSource`. Is there a way to disable the consumer
>         metrics using `FlinkKafkaConsumer`? Do you expect that to have
>         the same Metaspace issue?
>
>
>         [0]
>         https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>         <https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>
>
>         On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam
>         <kevin.lam@shopify.com <ma...@shopify.com>> wrote:
>
>             Thanks Arvid! I will give this a try and report back.
>
>             On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise
>             <arvid@apache.org <ma...@apache.org>> wrote:
>
>                 Hi Kevin,
>
>                 "java.lang.OutOfMemoryError: Metaspace" indicates that
>                 too many classes have been loaded. [1]
>                 If you only see that after a while, it's indicating
>                 that there is a classloader leak. I suspect that this
>                 is because of Kafka metrics. There have been some
>                 reports in the past.
>                 You can try to see what happens when you disable the
>                 forwarding of the Kafka metrics with
>                 |register.consumer.metrics: false [2].|
>
>                 [1]
>                 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>                 <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code>
>                 [2]
>                 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>                 <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties>
>
>                 On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam
>                 <kevin.lam@shopify.com <ma...@shopify.com>>
>                 wrote:
>
>                     Hi all,
>
>                     I'm observing an issue sometimes, and it's been
>                     hard to reproduce, where task managers are not
>                     able to register with the Flink cluster. We
>                     provision only the number of task managers
>                     required to run a given application, and so the
>                     absence of any of the task managers causes the job
>                     to enter a crash loop where it fails to get the
>                     required task slots.
>
>                     The failure occurs after a job has been running
>                     for a while, and when there have been job and task
>                     manager restarts. We run in kubernetes so pod
>                     disruptions occur from time to time, however we're
>                     running using the high availability setup [0]
>
>                     Has anyone encountered this before? Any suggestions?
>
>                     Below are some error messages pulled from the task
>                     managers failing to re-register.
>
>                     ```
>                     ] - Starting DefaultLeaderRetrievalService with
>                     KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>                     2021-08-16 13:15:10,112 INFO
>                      org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService
>                     [] - Starting DefaultLeaderElectionService with
>                     KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>                     2021-08-16 13:15:10,205 INFO
>                      org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>                     [] - New leader elected
>                     7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>                     streaming-sales-model-staging-restserver-leader.
>                     2021-08-16 13:15:10,205 INFO
>                      org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>                     [] - New leader elected
>                     7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>                     streaming-sales-model-staging-resourcemanager-leader.
>                     2021-08-16 13:15:10,205 INFO
>                      org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>                     [] - New leader elected
>                     7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>                     streaming-sales-model-staging-dispatcher-leader.
>                     2021-08-16 13:15:10,211 INFO
>                      org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService
>                     [] - Starting DefaultLeaderRetrievalService with
>                     KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>                     2021-08-16 13:16:26,103 WARN
>                      org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever
>                     [] - Error while retrieving the leader gateway.
>                     Retrying to connect to
>                     akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>                     2021-08-16 13:16:30,978 WARN
>                      org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever
>                     [] - Error while retrieving the leader gateway.
>                     Retrying to connect to
>                     akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>                     ```
>
>                     ```
>                     2021-08-15 14:02:21,078 ERROR
>                     org.apache.kafka.common.utils.KafkaThread        
>                                [] - Uncaught exception in thread
>                     'kafka-producer-network-thread |
>                     trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>                     java.lang.NoClassDefFoundError:
>                     org/apache/kafka/clients/NetworkClient$1
>                             at
>                     org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>                     ~[?:?]
>                             at
>                     org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>                     ~[?:?]
>                             at
>                     org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560)
>                     ~[?:?]
>                             at
>                     org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>                     ~[?:?]
>                             at
>                     org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>                     ~[?:?]
>                             at java.lang.Thread.run(Unknown Source) [?:?]
>                     Caused by: java.lang.ClassNotFoundException:
>                     org.apache.kafka.clients.NetworkClient$1
>                             at
>                     java.net.URLClassLoader.findClass(Unknown Source)
>                     ~[?:?]
>                             at java.lang.ClassLoader.loadClass(Unknown
>                     Source) ~[?:?]
>                             at
>                     org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>                     ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>                             at
>                     org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>                     ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>                             at
>                     org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>                     ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>                             at java.lang.ClassLoader.loadClass(Unknown
>                     Source) ~[?:?]
>                             ... 6 more
>                     ```
>
>                     ```
>                     connection to [null] failed with
>                     java.net.ConnectException: Connection refused:
>                     flink-jobmanager/10.28.65.100:6123
>                     <http://10.28.65.100:6123>
>                     2021-08-16 13:14:59,668 WARN
>                      akka.remote.ReliableDeliverySupervisor          
>                               [] - Association with remote system
>                     [akka.tcp://flink@flink-jobmanager:6123] has
>                     failed, address is now gated for [50] ms. Reason:
>                     [Association failed with
>                     [akka.tcp://flink@flink-jobmanager:6123]] Caused
>                     by: [java.net.ConnectException: Connection
>                     refused: flink-jobmanager/10.28.65.100:6123
>                     <http://10.28.65.100:6123>]
>                     2021-08-16 13:14:59,669 INFO
>                      org.apache.flink.runtime.taskexecutor.TaskExecutor
>                               [] - Could not resolve ResourceManager
>                     address
>                     akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>                     retrying in 10000 ms: Could not connect to rpc
>                     endpoint under address
>                     akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>                     ```
>
>                     ```
>                     2021-08-15 16:55:13,222 ERROR
>                     org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>                     [] - Failed to submit a listener notification
>                     task. Event loop shut down?
>                     java.lang.OutOfMemoryError: Metaspace. The
>                     metaspace out-of-memory error has occurred. This
>                     can mean two things: either the job requires a
>                     larger size of JVM metaspace to load classes or
>                     there is a class loading leak. In the first case
>                     'taskmanager.memory.jvm-metaspace.size'
>                     configuration option should be increased. If the
>                     error persists (usually in cluster after several
>                     job (re-)submissions) then there is probably a
>                     class loading leak in user code or some of its
>                     dependencies which has to be investigated and
>                     fixed. The task executor has to be shutdown...
>                     ```
>
>
>                     [0]
>                     https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>                     <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes>
>


Re: Task Managers having trouble registering after restart

Posted by Kevin Lam <ke...@shopify.com>.
Thank you for pulling in Chesnay.

I haven't been able to confirm the issue doesn't happen yet, as I've found
it difficult to reproduce easily. I did have follow-up questions:

1/ If Kafka metrics are indeed the cause of the leak, is there a
workaround? We'd be interested in having these metrics available for
monitoring and alerting purposes.

2/ Do you have any tips on identifying/confirming where the leak is coming
from?



On Tue, Aug 24, 2021 at 3:48 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Kevin,
>
> The metrics are exposed similarly, so I expect the same issues as they
> come from Kafka's Consumer API itself.
>
> I'll pull in @Chesnay Schepler <ch...@apache.org> who afaik debugged
> the leak a while ago.
>
> On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <ke...@shopify.com> wrote:
>
>> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
>> `KafkaSource`. Is there a way to disable the consumer metrics using
>> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>>
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>>
>> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <ke...@shopify.com> wrote:
>>
>>> Thanks Arvid! I will give this a try and report back.
>>>
>>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>>>> have been loaded. [1]
>>>> If you only see that after a while, it's indicating that there is a
>>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>>> have been some reports in the past.
>>>> You can try to see what happens when you disable the forwarding of the
>>>> Kafka metrics with register.consumer.metrics: false [2].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>>
>>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm observing an issue sometimes, and it's been hard to reproduce,
>>>>> where task managers are not able to register with the Flink cluster. We
>>>>> provision only the number of task managers required to run a given
>>>>> application, and so the absence of any of the task managers causes the job
>>>>> to enter a crash loop where it fails to get the required task slots.
>>>>>
>>>>> The failure occurs after a job has been running for a while, and when
>>>>> there have been job and task manager restarts. We run in kubernetes so pod
>>>>> disruptions occur from time to time, however we're running using the high
>>>>> availability setup [0]
>>>>>
>>>>> Has anyone encountered this before? Any suggestions?
>>>>>
>>>>> Below are some error messages pulled from the task managers failing to
>>>>> re-register.
>>>>>
>>>>> ```
>>>>> ] - Starting DefaultLeaderRetrievalService with
>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>> 2021-08-16 13:15:10,112 INFO
>>>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>>>> Starting DefaultLeaderElectionService with
>>>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-restserver-leader.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-resourcemanager-leader.
>>>>> 2021-08-16 13:15:10,205 INFO
>>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>>> streaming-sales-model-staging-dispatcher-leader.
>>>>> 2021-08-16 13:15:10,211 INFO
>>>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>>>> - Starting DefaultLeaderRetrievalService with
>>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>>>> 2021-08-16 13:16:26,103 WARN
>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>> 2021-08-16 13:16:30,978 WARN
>>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>>> ```
>>>>>
>>>>> ```
>>>>> 2021-08-15 14:02:21,078 ERROR
>>>>> org.apache.kafka.common.utils.KafkaThread                    [] - Uncaught
>>>>> exception in thread 'kafka-producer-network-thread |
>>>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>>>> java.lang.NoClassDefFoundError:
>>>>> org/apache/kafka/clients/NetworkClient$1
>>>>>         at
>>>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>>>> ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>>>> ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>>>> ~[?:?]
>>>>>         at
>>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>>> ~[?:?]
>>>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.kafka.clients.NetworkClient$1
>>>>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>         at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>         at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>         at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>>         ... 6 more
>>>>> ```
>>>>>
>>>>> ```
>>>>> connection to [null] failed with java.net.ConnectException: Connection
>>>>> refused: flink-jobmanager/10.28.65.100:6123
>>>>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>>>>                     [] - Association with remote system
>>>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>>>> gated for [50] ms. Reason: [Association failed with
>>>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>>>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>>>>> 10.28.65.100:6123]
>>>>> 2021-08-16 13:14:59,669 INFO
>>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>>>>> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>>>>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>>>>> ```
>>>>>
>>>>> ```
>>>>> 2021-08-15 16:55:13,222 ERROR
>>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>>> [] - Failed to submit a listener notification task. Event loop shut down?
>>>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory
>>>>> error has occurred. This can mean two things: either the job requires a
>>>>> larger size of JVM metaspace to load classes or there is a class loading
>>>>> leak. In the first case 'taskmanager.memory.jvm-metaspace.size'
>>>>> configuration option should be increased. If the error persists (usually in
>>>>> cluster after several job (re-)submissions) then there is probably a class
>>>>> loading leak in user code or some of its dependencies which has to be
>>>>> investigated and fixed. The task executor has to be shutdown...
>>>>> ```
>>>>>
>>>>>
>>>>> [0]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>>>>
>>>>

Re: Task Managers having trouble registering after restart

Posted by Arvid Heise <ar...@apache.org>.
Hi Kevin,

The metrics are exposed similarly, so I expect the same issues as they come
from Kafka's Consumer API itself.

I'll pull in @Chesnay Schepler <ch...@apache.org> who afaik debugged the
leak a while ago.

On Mon, Aug 23, 2021 at 9:24 PM Kevin Lam <ke...@shopify.com> wrote:

> Actually, we are using the `FlinkKafkaConsumer` [0] rather than
> `KafkaSource`. Is there a way to disable the consumer metrics using
> `FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?
>
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html
>
> On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <ke...@shopify.com> wrote:
>
>> Thanks Arvid! I will give this a try and report back.
>>
>> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Kevin,
>>>
>>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>>> have been loaded. [1]
>>> If you only see that after a while, it's indicating that there is a
>>> classloader leak. I suspect that this is because of Kafka metrics. There
>>> have been some reports in the past.
>>> You can try to see what happens when you disable the forwarding of the
>>> Kafka metrics with register.consumer.metrics: false [2].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>>
>>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm observing an issue sometimes, and it's been hard to reproduce,
>>>> where task managers are not able to register with the Flink cluster. We
>>>> provision only the number of task managers required to run a given
>>>> application, and so the absence of any of the task managers causes the job
>>>> to enter a crash loop where it fails to get the required task slots.
>>>>
>>>> The failure occurs after a job has been running for a while, and when
>>>> there have been job and task manager restarts. We run in kubernetes so pod
>>>> disruptions occur from time to time, however we're running using the high
>>>> availability setup [0]
>>>>
>>>> Has anyone encountered this before? Any suggestions?
>>>>
>>>> Below are some error messages pulled from the task managers failing to
>>>> re-register.
>>>>
>>>> ```
>>>> ] - Starting DefaultLeaderRetrievalService with
>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>> 2021-08-16 13:15:10,112 INFO
>>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>>> Starting DefaultLeaderElectionService with
>>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>>> 2021-08-16 13:15:10,205 INFO
>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>> streaming-sales-model-staging-restserver-leader.
>>>> 2021-08-16 13:15:10,205 INFO
>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>> streaming-sales-model-staging-resourcemanager-leader.
>>>> 2021-08-16 13:15:10,205 INFO
>>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>>> streaming-sales-model-staging-dispatcher-leader.
>>>> 2021-08-16 13:15:10,211 INFO
>>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>>> - Starting DefaultLeaderRetrievalService with
>>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>>> 2021-08-16 13:16:26,103 WARN
>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>> 2021-08-16 13:16:30,978 WARN
>>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>>> - Error while retrieving the leader gateway. Retrying to connect to
>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>>> ```
>>>>
>>>> ```
>>>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>>>                    [] - Uncaught exception in thread
>>>> 'kafka-producer-network-thread |
>>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>>>>         at
>>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>>> ~[?:?]
>>>>         at
>>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>>> ~[?:?]
>>>>         at
>>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>>>         at
>>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>>> ~[?:?]
>>>>         at
>>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>>> ~[?:?]
>>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.kafka.clients.NetworkClient$1
>>>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>         at
>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>         at
>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>         at
>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>>         ... 6 more
>>>> ```
>>>>
>>>> ```
>>>> connection to [null] failed with java.net.ConnectException: Connection
>>>> refused: flink-jobmanager/10.28.65.100:6123
>>>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     [] - Association with remote system
>>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>>> gated for [50] ms. Reason: [Association failed with
>>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>>>> 10.28.65.100:6123]
>>>> 2021-08-16 13:14:59,669 INFO
>>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>>>> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>>>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>>>> ```
>>>>
>>>> ```
>>>> 2021-08-15 16:55:13,222 ERROR
>>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>>> [] - Failed to submit a listener notification task. Event loop shut down?
>>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory
>>>> error has occurred. This can mean two things: either the job requires a
>>>> larger size of JVM metaspace to load classes or there is a class loading
>>>> leak. In the first case 'taskmanager.memory.jvm-metaspace.size'
>>>> configuration option should be increased. If the error persists (usually in
>>>> cluster after several job (re-)submissions) then there is probably a class
>>>> loading leak in user code or some of its dependencies which has to be
>>>> investigated and fixed. The task executor has to be shutdown...
>>>> ```
>>>>
>>>>
>>>> [0]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>>>
>>>

Re: Task Managers having trouble registering after restart

Posted by Kevin Lam <ke...@shopify.com>.
Actually, we are using the `FlinkKafkaConsumer` [0] rather than
`KafkaSource`. Is there a way to disable the consumer metrics using
`FlinkKafkaConsumer`? Do you expect that to have the same Metaspace issue?


[0]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html

On Mon, Aug 23, 2021 at 2:55 PM Kevin Lam <ke...@shopify.com> wrote:

> Thanks Arvid! I will give this a try and report back.
>
> On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
>> have been loaded. [1]
>> If you only see that after a while, it's indicating that there is a
>> classloader leak. I suspect that this is because of Kafka metrics. There
>> have been some reports in the past.
>> You can try to see what happens when you disable the forwarding of the
>> Kafka metrics with register.consumer.metrics: false [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>>
>> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com> wrote:
>>
>>> Hi all,
>>>
>>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>>> task managers are not able to register with the Flink cluster. We provision
>>> only the number of task managers required to run a given application, and
>>> so the absence of any of the task managers causes the job to enter a crash
>>> loop where it fails to get the required task slots.
>>>
>>> The failure occurs after a job has been running for a while, and when
>>> there have been job and task manager restarts. We run in kubernetes so pod
>>> disruptions occur from time to time, however we're running using the high
>>> availability setup [0]
>>>
>>> Has anyone encountered this before? Any suggestions?
>>>
>>> Below are some error messages pulled from the task managers failing to
>>> re-register.
>>>
>>> ```
>>> ] - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,112 INFO
>>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>>> Starting DefaultLeaderElectionService with
>>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-restserver-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-resourcemanager-leader.
>>> 2021-08-16 13:15:10,205 INFO
>>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>>> streaming-sales-model-staging-dispatcher-leader.
>>> 2021-08-16 13:15:10,211 INFO
>>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>>> - Starting DefaultLeaderRetrievalService with
>>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>>> 2021-08-16 13:16:26,103 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> 2021-08-16 13:16:30,978 WARN
>>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>>> - Error while retrieving the leader gateway. Retrying to connect to
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>>> ```
>>>
>>> ```
>>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>>                    [] - Uncaught exception in thread
>>> 'kafka-producer-network-thread |
>>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>>>         at
>>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>>> ~[?:?]
>>>         at
>>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>>> ~[?:?]
>>>         at
>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>>         at
>>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>>> ~[?:?]
>>>         at
>>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>>> ~[?:?]
>>>         at java.lang.Thread.run(Unknown Source) [?:?]
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.kafka.clients.NetworkClient$1
>>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>         at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>         at
>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>         at
>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>>         ... 6 more
>>> ```
>>>
>>> ```
>>> connection to [null] failed with java.net.ConnectException: Connection
>>> refused: flink-jobmanager/10.28.65.100:6123
>>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>>                   [] - Association with remote system
>>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>> gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>>> 10.28.65.100:6123]
>>> 2021-08-16 13:14:59,669 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>>> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>>> ```
>>>
>>> ```
>>> 2021-08-15 16:55:13,222 ERROR
>>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>>> [] - Failed to submit a listener notification task. Event loop shut down?
>>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
>>> has occurred. This can mean two things: either the job requires a larger
>>> size of JVM metaspace to load classes or there is a class loading leak. In
>>> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
>>> should be increased. If the error persists (usually in cluster after
>>> several job (re-)submissions) then there is probably a class loading leak
>>> in user code or some of its dependencies which has to be investigated and
>>> fixed. The task executor has to be shutdown...
>>> ```
>>>
>>>
>>> [0]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>>
>>

Re: Task Managers having trouble registering after restart

Posted by Kevin Lam <ke...@shopify.com>.
Thanks Arvid! I will give this a try and report back.

On Mon, Aug 23, 2021 at 11:07 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Kevin,
>
> "java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
> have been loaded. [1]
> If you only see that after a while, it's indicating that there is a
> classloader leak. I suspect that this is because of Kafka metrics. There
> have been some reports in the past.
> You can try to see what happens when you disable the forwarding of the
> Kafka metrics with register.consumer.metrics: false [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties
>
> On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com> wrote:
>
>> Hi all,
>>
>> I'm observing an issue sometimes, and it's been hard to reproduce, where
>> task managers are not able to register with the Flink cluster. We provision
>> only the number of task managers required to run a given application, and
>> so the absence of any of the task managers causes the job to enter a crash
>> loop where it fails to get the required task slots.
>>
>> The failure occurs after a job has been running for a while, and when
>> there have been job and task manager restarts. We run in kubernetes so pod
>> disruptions occur from time to time, however we're running using the high
>> availability setup [0]
>>
>> Has anyone encountered this before? Any suggestions?
>>
>> Below are some error messages pulled from the task managers failing to
>> re-register.
>>
>> ```
>> ] - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,112 INFO
>>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
>> Starting DefaultLeaderElectionService with
>> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-restserver-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-resourcemanager-leader.
>> 2021-08-16 13:15:10,205 INFO
>>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
>> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
>> streaming-sales-model-staging-dispatcher-leader.
>> 2021-08-16 13:15:10,211 INFO
>>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
>> - Starting DefaultLeaderRetrievalService with
>> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
>> 2021-08-16 13:16:26,103 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> 2021-08-16 13:16:30,978 WARN
>>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
>> - Error while retrieving the leader gateway. Retrying to connect to
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
>> ```
>>
>> ```
>> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>>                  [] - Uncaught exception in thread
>> 'kafka-producer-network-thread |
>> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>>         at
>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
>> ~[?:?]
>>         at
>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
>> ~[?:?]
>>         at
>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>>         at
>> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
>> ~[?:?]
>>         at
>> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
>> ~[?:?]
>>         at java.lang.Thread.run(Unknown Source) [?:?]
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.clients.NetworkClient$1
>>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>         at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>         at
>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>         at
>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>>         ... 6 more
>> ```
>>
>> ```
>> connection to [null] failed with java.net.ConnectException: Connection
>> refused: flink-jobmanager/10.28.65.100:6123
>> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>>                   [] - Association with remote system
>> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>> gated for [50] ms. Reason: [Association failed with
>> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>> [java.net.ConnectException: Connection refused: flink-jobmanager/
>> 10.28.65.100:6123]
>> 2021-08-16 13:14:59,669 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
>> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
>> retrying in 10000 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
>> ```
>>
>> ```
>> 2021-08-15 16:55:13,222 ERROR
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
>> [] - Failed to submit a listener notification task. Event loop shut down?
>> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
>> has occurred. This can mean two things: either the job requires a larger
>> size of JVM metaspace to load classes or there is a class loading leak. In
>> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
>> should be increased. If the error persists (usually in cluster after
>> several job (re-)submissions) then there is probably a class loading leak
>> in user code or some of its dependencies which has to be investigated and
>> fixed. The task executor has to be shutdown...
>> ```
>>
>>
>> [0]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>>
>

Re: Task Managers having trouble registering after restart

Posted by Arvid Heise <ar...@apache.org>.
Hi Kevin,

"java.lang.OutOfMemoryError: Metaspace" indicates that too many classes
have been loaded. [1]
If you only see that after a while, it's indicating that there is a
classloader leak. I suspect that this is because of Kafka metrics. There
have been some reports in the past.
You can try to see what happens when you disable the forwarding of the
Kafka metrics with register.consumer.metrics: false [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#additional-properties

On Tue, Aug 17, 2021 at 8:55 PM Kevin Lam <ke...@shopify.com> wrote:

> Hi all,
>
> I'm observing an issue sometimes, and it's been hard to reproduce, where
> task managers are not able to register with the Flink cluster. We provision
> only the number of task managers required to run a given application, and
> so the absence of any of the task managers causes the job to enter a crash
> loop where it fails to get the required task slots.
>
> The failure occurs after a job has been running for a while, and when
> there have been job and task manager restarts. We run in kubernetes so pod
> disruptions occur from time to time, however we're running using the high
> availability setup [0]
>
> Has anyone encountered this before? Any suggestions?
>
> Below are some error messages pulled from the task managers failing to
> re-register.
>
> ```
> ] - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,112 INFO
>  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Starting DefaultLeaderElectionService with
> KubernetesLeaderElectionDriver{configMapName='streaming-sales-model-staging-resourcemanager-leader'}.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-restserver-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-resourcemanager-leader.
> 2021-08-16 13:15:10,205 INFO
>  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector
> [] - New leader elected 7f504af8-5f00-494e-8ecb-c3e67688bae8 for
> streaming-sales-model-staging-dispatcher-leader.
> 2021-08-16 13:15:10,211 INFO
>  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService []
> - Starting DefaultLeaderRetrievalService with
> KubernetesLeaderRetrievalDriver{configMapName='streaming-sales-model-staging-dispatcher-leader'}.
> 2021-08-16 13:16:26,103 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> 2021-08-16 13:16:30,978 WARN
>  org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever []
> - Error while retrieving the leader gateway. Retrying to connect to
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/dispatcher_1.
> ```
>
> ```
> 2021-08-15 14:02:21,078 ERROR org.apache.kafka.common.utils.KafkaThread
>                  [] - Uncaught exception in thread
> 'kafka-producer-network-thread |
> trickle-producer-monorail_sales_facts_non_recent_v0_1-1629035259075':
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>         at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:748)
> ~[?:?]
>         at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:899)
> ~[?:?]
>         at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) ~[?:?]
>         at
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324)
> ~[?:?]
>         at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
> ~[?:?]
>         at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.clients.NetworkClient$1
>         at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>         at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>         at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>         at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> ~[flink-dist_2.12-1.13.1-shopify-3491d59.jar:1.13.1-shopify-3491d59]
>         at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>         ... 6 more
> ```
>
> ```
> connection to [null] failed with java.net.ConnectException: Connection
> refused: flink-jobmanager/10.28.65.100:6123
> 2021-08-16 13:14:59,668 WARN  akka.remote.ReliableDeliverySupervisor
>                 [] - Association with remote system
> [akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
> for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@flink-jobmanager:6123]] Caused by:
> [java.net.ConnectException: Connection refused: flink-jobmanager/
> 10.28.65.100:6123]
> 2021-08-16 13:14:59,669 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could
> not resolve ResourceManager address akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0,
> retrying in 10000 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_0.
> ```
>
> ```
> 2021-08-15 16:55:13,222 ERROR
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.rejectedExecution
> [] - Failed to submit a listener notification task. Event loop shut down?
> java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
> has occurred. This can mean two things: either the job requires a larger
> size of JVM metaspace to load classes or there is a class loading leak. In
> the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
> should be increased. If the error persists (usually in cluster after
> several job (re-)submissions) then there is probably a class loading leak
> in user code or some of its dependencies which has to be investigated and
> fixed. The task executor has to be shutdown...
> ```
>
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/#high-availability-with-standalone-kubernetes
>