You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (Jira)" <ji...@apache.org> on 2022/03/14 09:35:00 UTC

[jira] (FLINK-26256) AWS SDK Async Event Loop Group Classloader Issue

    [ https://issues.apache.org/jira/browse/FLINK-26256 ]


    Chesnay Schepler deleted comment on FLINK-26256:
    ------------------------------------------

was (Author: zentol):
The ELG should already be scoped by job because it runs in the user CL, so the only missing thing should be shutting it down using {{RuntimeContext#registerUserCodeClassLoaderReleaseHookIfAbsent}}.

> AWS SDK Async Event Loop Group Classloader Issue
> ------------------------------------------------
>
>                 Key: FLINK-26256
>                 URL: https://issues.apache.org/jira/browse/FLINK-26256
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>            Reporter: Danny Cranmer
>            Assignee: Zichen Liu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a shared thread pool for Netty to use for network operations when one is not configured. The thread pool is managed by a shared ELG (event loop group), and this is stored in a static field. We do not configure this for the AWS connectors in the Flink codebase. 
> When threads are spawned within the ELG, they inherit the context classloader from the current thread. If the ELG is created from a shared classloader, for instance Flink parent classloader, or MiniCluster parent classloader, multiple Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink job, it will inherit the Flink user classloader. When this job completes/fails, the classloader is destroyed, however the Netty thread is still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via the Flink User Classloader. It is expected this is the standard deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
> Feb 09 20:05:04 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
> Feb 09 20:05:04 	at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)
> Feb 09 20:05:04 	at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
> Feb 09 20:05:04 	at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder$1.run(FactoryFinder.java:352)
> Feb 09 20:05:04 	at java.security.AccessController.doPrivileged(Native Method)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder.findServiceProvider(FactoryFinder.java:341)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:313)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:227)
> Feb 09 20:05:04 	at javax.xml.stream.XMLInputFactory.newInstance(XMLInputFactory.java:154)
> Feb 09 20:05:04 	at software.amazon.awssdk.protocols.query.unmarshall.XmlDomParser.createXmlInputFactory(XmlDomParser.java:124)
> Feb 09 20:05:04 	at java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284)
> Feb 09 20:05:04 	at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
> Feb 09 20:05:04 	at java.lang.ThreadLocal.get(ThreadLocal.java:170)
> (...)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)