You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/04/13 06:28:06 UTC

[jira] [Updated] (FLINK-26256) AWS SDK Async Event Loop Group Classloader Issue

     [ https://issues.apache.org/jira/browse/FLINK-26256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Gao updated FLINK-26256:
----------------------------
    Fix Version/s: 1.16.0

> AWS SDK Async Event Loop Group Classloader Issue
> ------------------------------------------------
>
>                 Key: FLINK-26256
>                 URL: https://issues.apache.org/jira/browse/FLINK-26256
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>            Reporter: Danny Cranmer
>            Assignee: Zichen Liu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0, 1.16.0
>
>
> h3. Background
> AWS SDK v2 async clients use a Netty async client for Kinesis Data Streams/Firehose sink and Kinesis Data Streams EFO consumer. The SDK creates a shared thread pool for Netty to use for network operations when one is not configured. The thread pool is managed by a shared ELG (event loop group), and this is stored in a static field. We do not configure this for the AWS connectors in the Flink codebase. 
> When threads are spawned within the ELG, they inherit the context classloader from the current thread. If the ELG is created from a shared classloader, for instance Flink parent classloader, or MiniCluster parent classloader, multiple Flink jobs can share the same ELG. When an ELG thread is spawned from a Flink job, it will inherit the Flink user classloader. When this job completes/fails, the classloader is destroyed, however the Netty thread is still referencing it, and this leads to below exception.
> h3. Impact
> This issue *does not* impact jobs deployed to TM when AWS SDK v2 is loaded via the Flink User Classloader. It is expected this is the standard deployment configuration.
> This issue is known to impact:
> - Flink mini cluster, for example in integration tests (FLINK-26064)
> - Flink cluster loading AWS SDK v2 via parent classloader
> h3. Suggested solution
> There are a few possible solutions, as discussed https://github.com/apache/flink/pull/18733
> 1. Create a separate ELG per Flink job
> 2. Create a separate ELG per subtask
> 3. Attach the correct classloader to ELG spawned threads
> h3. Error Stack
> (shortened stack trace, as full is too large)
> {noformat}
> Feb 09 20:05:04 java.util.concurrent.ExecutionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> Feb 09 20:05:04 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> (...)
> Feb 09 20:05:04 Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
> Feb 09 20:05:04 	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
> (...)
> Feb 09 20:05:04 Caused by: java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
> Feb 09 20:05:04 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
> Feb 09 20:05:04 	at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResources(FlinkUserCodeClassLoaders.java:188)
> Feb 09 20:05:04 	at java.util.ServiceLoader$LazyIterator.hasNextService(ServiceLoader.java:348)
> Feb 09 20:05:04 	at java.util.ServiceLoader$LazyIterator.hasNext(ServiceLoader.java:393)
> Feb 09 20:05:04 	at java.util.ServiceLoader$1.hasNext(ServiceLoader.java:474)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder$1.run(FactoryFinder.java:352)
> Feb 09 20:05:04 	at java.security.AccessController.doPrivileged(Native Method)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder.findServiceProvider(FactoryFinder.java:341)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:313)
> Feb 09 20:05:04 	at javax.xml.stream.FactoryFinder.find(FactoryFinder.java:227)
> Feb 09 20:05:04 	at javax.xml.stream.XMLInputFactory.newInstance(XMLInputFactory.java:154)
> Feb 09 20:05:04 	at software.amazon.awssdk.protocols.query.unmarshall.XmlDomParser.createXmlInputFactory(XmlDomParser.java:124)
> Feb 09 20:05:04 	at java.lang.ThreadLocal$SuppliedThreadLocal.initialValue(ThreadLocal.java:284)
> Feb 09 20:05:04 	at java.lang.ThreadLocal.setInitialValue(ThreadLocal.java:180)
> Feb 09 20:05:04 	at java.lang.ThreadLocal.get(ThreadLocal.java:170)
> (...)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)