You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aleksandr Pilipenko (Jira)" <ji...@apache.org> on 2024/04/29 16:08:00 UTC

[jira] [Commented] (FLINK-34071) Deadlock in AWS Kinesis Data Streams AsyncSink connector

    [ https://issues.apache.org/jira/browse/FLINK-34071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842055#comment-17842055 ] 

Aleksandr Pilipenko commented on FLINK-34071:
---------------------------------------------

We found that one of the causes of this - continuous retry of records, that are consistently fail on write.

Visibility into such issues is limited due to https://issues.apache.org/jira/browse/FLINK-35269

> Deadlock in AWS Kinesis Data Streams AsyncSink connector
> --------------------------------------------------------
>
>                 Key: FLINK-34071
>                 URL: https://issues.apache.org/jira/browse/FLINK-34071
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / AWS, Connectors / Kinesis
>    Affects Versions: aws-connector-3.0.0, 1.15.4, aws-connector-4.2.0
>            Reporter: Aleksandr Pilipenko
>            Priority: Major
>
> Sink operator hangs while flushing records, similarly to FLINK-32230. Error observed even when using AWS SDK version that contains fix for async client error handling [https://github.com/aws/aws-sdk-java-v2/pull/4402]
> Thread dump of stuck thread:
> {code:java}
> "sdk-async-response-1-6236" Id=11213 RUNNABLE
>     at app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.lambda$flush$5(AsyncSinkWriter.java:385)
>     at app//org.apache.flink.connector.base.sink.writer.AsyncSinkWriter$$Lambda$1778/0x0000000801141040.accept(Unknown Source)
>     at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.handleFullyFailedRequest(KinesisStreamsSinkWriter.java:210)
>     at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter.lambda$submitRequestEntries$1(KinesisStreamsSinkWriter.java:184)
>     at org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkWriter$$Lambda$1965/0x00000008011a0c40.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x0000000801181840.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage.lambda$execute$0(AsyncApiCallMetricCollectionStage.java:56)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallMetricCollectionStage$$Lambda$1961/0x0000000801191c40.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage.lambda$execute$2(AsyncApiCallTimeoutTrackingStage.java:67)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncApiCallTimeoutTrackingStage$$Lambda$1960/0x0000000801191840.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.utils.CompletableFutureUtils$$Lambda$1925/0x0000000801181840.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:170)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor$$Lambda$1956/0x0000000801192840.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda$1954/0x0000000801193040.accept(Unknown Source)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>     at java.base@11.0.18/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:238)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$$Lambda$1952/0x0000000801193840.apply(Unknown Source)
>     ... {code}
> Alongside this issue following exception can be observed
> {code:java}
> java.io.IOException: An error occurred on the connection: java.nio.channels.ClosedChannelException, [channel: 159aa20c]. All streams will be closed
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.decorateConnectionException(MultiplexedChannelRecord.java:213)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeChildChannels$10(MultiplexedChannelRecord.java:205)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.lambda$closeAndExecuteOnChildChannels$11(MultiplexedChannelRecord.java:229)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop(NettyUtils.java:248)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeAndExecuteOnChildChannels(MultiplexedChannelRecord.java:220)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.MultiplexedChannelRecord.closeChildChannels(MultiplexedChannelRecord.java:205)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:353)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.closeAndReleaseParent(Http2MultiplexedChannelPool.java:333)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool.access$200(Http2MultiplexedChannelPool.java:76)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.closeAndReleaseParent(Http2MultiplexedChannelPool.java:509)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool$ReleaseOnExceptionHandler.channelInactive(Http2MultiplexedChannelPool.java:486)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.logging.LoggingHandler.channelInactive(LoggingHandler.java:206)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>     at org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.nio.netty.internal.http2.Http2PingHandler.channelInactive(Http2PingHandler.java:77)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.channelInactive(Http2ConnectionHandler.java:430)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:411)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:376)
>     at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1085)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:305)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813)
>     at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
>     at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
>     at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
>     at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)
>     at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
>     at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.nio.channels.ClosedChannelException
>     ... 41 more
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)