You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by "海洋饼干 (Jira)" <ji...@apache.org> on 2022/08/12 06:16:00 UTC

[jira] [Created] (PULSAR-22) flink消费pulsar时出现错误,但是仍可消费到数据

海洋饼干 created PULSAR-22:
--------------------------

             Summary: flink消费pulsar时出现错误,但是仍可消费到数据
                 Key: PULSAR-22
                 URL: https://issues.apache.org/jira/browse/PULSAR-22
             Project: Pulsar
          Issue Type: Bug
         Environment: kafka(json) -> flink(Stream) -> custom 

Serializer -> kop -> pulsar -> flink(stream) keyshared consum(问题所在区域)
            Reporter: 海洋饼干


[Source Data Fetcher for Source: kafkaSource (4/4)#0] ERROR org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase - Error in polling message from pulsar consumer.
java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"}
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.pollMessage(PulsarUnorderedPartitionSplitReader.java:98)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase.fetch(PulsarPartitionSplitReaderBase.java:110)
    at org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader.fetch(PulsarUnorderedPartitionSplitReader.java:56)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TransactionConflictException: \{"errorMsg":"org.apache.pulsar.transaction.common.exception.TransactionConflictException: [persistent://public/default/HB0002_rdwBaseCtr_aaamore5005-test-pulsar-partition-0][flink-source] Transaction:(1,12) try to ack batch message:810:24 in pending ack status.","reqId":4357026755663389273, "remote":"tk01-bd-test-pulsar-7-139/192.168.7.139:6650", "local":"/192.168.34.54:9756"}
    at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1177)
    at org.apache.pulsar.client.impl.ClientCnx.handleAckResponse(ClientCnx.java:431)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:150)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)