You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Danny Cranmer (Jira)" <ji...@apache.org> on 2022/12/05 22:38:00 UTC

[jira] [Commented] (FLINK-30304) Possible Deadlock in Kinesis/Firehose/DynamoDB Connector

    [ https://issues.apache.org/jira/browse/FLINK-30304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643583#comment-17643583 ] 

Danny Cranmer commented on FLINK-30304:
---------------------------------------

I do not think we can do anything about this besides wait for the fix in the AWS SDK. Essentially there is an error in the Async client which results in the Future not being completed. This [issue](https://github.com/aws/aws-sdk-java-v2/pull/3574) should fix it once merged.

One way around this is to keep track of inflight request time, and fail the job (or retry) upon some timeout.

> Possible Deadlock in Kinesis/Firehose/DynamoDB Connector
> --------------------------------------------------------
>
>                 Key: FLINK-30304
>                 URL: https://issues.apache.org/jira/browse/FLINK-30304
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / DynamoDB, Connectors / Firehose, Connectors / Kinesis
>    Affects Versions: 1.16.0, 1.15.3, aws-connector-3.0.0
>            Reporter: Danny Cranmer
>            Assignee: Danny Cranmer
>            Priority: Critical
>             Fix For: 1.17.0, 1.16.1, 1.15.4, aws-connector-4.0.0, aws-connector-3.1.0
>
>
> AWS Sinks based on Async Sink can enter a deadlock situation if the AWS async client throws error outside of the future. We observed this with a local application:
> {code:java}
> java.lang.NullPointerException
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.closedChannelMessage(NettyUtils.java:135)
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.decorateException(NettyUtils.java:71)
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:310)
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:189)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.CancellableAcquireChannelPool.lambda$acquire$1(CancellableAcquireChannelPool.java:58)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.ensureAcquiredChannelIsHealthy(HealthCheckedChannelPool.java:114)
> at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.HealthCheckedChannelPool.lambda$tryAcquire$1(HealthCheckedChannelPool.java:97)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise.access$200(DefaultPromise.java:35)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultPromise$1.run(DefaultPromise.java:502)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.base/java.lang.Thread.run(Thread.java:829){code}
> Related AWS SDK issues that can cause this:
>  * [https://github.com/aws/aws-sdk-java-v2/issues/3435]
>  * [https://github.com/aws/aws-sdk-java-v2/issues/1812]
> If an error is thrown and not handled by the future then the AsyncSink will never decrement {{inFlightRequestCount}}. the job will hang while trying flush for checkpoint



--
This message was sent by Atlassian Jira
(v8.20.10#820010)