You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by "海洋饼干 (Jira)" <ji...@apache.org> on 2022/08/12 06:16:00 UTC
[jira] [Created] (PULSAR-22) flink消费pulsar时出现错误,但是仍可消费到数据
海洋饼干 created PULSAR-22:
--------------------------
Summary: flink消费pulsar时出现错误,但是仍可消费到数据
Key: PULSAR-22
URL: https://issues.apache.org/jira/browse/PULSAR-22
Project: Pulsar
Issue Type: Bug
Environment: kafka(json) -> flink(Stream) -> custom
Serializer -> kop -> pulsar -> flink(stream) keyshared consum(问题所在区域)
Reporter: 海洋饼干
[Source Data Fetcher for Source: kafkaSource (4/4)#0] ERROR org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase - Error in polling message from pulsar consumer.
java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"}
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:98)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:110)
at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:56)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"}
at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1177)
at org.apache.pulsar.client.impl.ClientCnx.handleAckResponse(ClientCnx.java:431)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
--
This message was sent by Atlassian Jira
(v8.20.10#820010)