You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by "海洋饼干 (Jira)" <ji...@apache.org> on 2022/08/25 00:21:00 UTC

[jira] [Closed] (PULSAR-22) flink消费pulsar时出现错误,但是仍可消费到数据

     [ https://issues.apache.org/jira/browse/PULSAR-22?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

海洋饼干 closed PULSAR-22.
----------------------
    Resolution: Cannot Reproduce

> flink消费pulsar时出现错误,但是仍可消费到数据
> ----------------------------
>
>                 Key: PULSAR-22
>                 URL: https://issues.apache.org/jira/browse/PULSAR-22
>             Project: Pulsar
>          Issue Type: Bug
>         Environment: kafka(json) -> flink(Stream) -> custom 
> Serializer -> kop -> pulsar -> flink(stream) keyshared consum(问题所在区域)
>            Reporter: 海洋饼干
>            Priority: Major
>
> [Source Data Fetcher for Source: kafkaSource (4/4)#0] ERROR org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase - Error in polling message from pulsar consumer.
> java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"}
>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>     at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:98)
>     at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:110)
>     at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:56)
>     at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>     at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>     at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"}
>     at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1177)
>     at org.apache.pulsar.client.impl.ClientCnx.handleAckResponse(ClientCnx.java:431)
>     at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
>     at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>     at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>     at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>     at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>     at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>     at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
>     at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
>     at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
>     at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
>     at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>     at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>     at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>     ... 1 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)