You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/23 19:20:27 UTC

[pulsar] 02/02: Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e2778b26eff9134bcde1fbe2e84aaca86edfceb5
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Feb 23 12:03:07 2022 +0800

    Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)
    
    ### Motivation
    
    Test case `SimpleProducerConsumerTest#testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause` fails with debug log enabled.
    
    Root cause is that `redeliver.getConsumerEpoch()`  used in debug log without check if it's set.
    
    ```
    2022-02-22T13:13:09,216+0800 [pulsar-io-6-1] WARN  ServerCnx - [/127.0.0.1:64428] Got exception java.lang.IllegalStateException: Field 'consumer_epoch' is not set
    	at org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages.getConsumerEpoch(CommandRedeliverUnacknowledgedMessages.java:87)
    	at org.apache.pulsar.broker.service.ServerCnx.handleRedeliverUnacknowledged(ServerCnx.java:1559)
    	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:274)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
    	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
    	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    ```
    
    ### Modifications
    
    Add check before get.
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    This change is a trivial rework / code cleanup without any test coverage.
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
    Check the box below and label this PR (if you have committer privilege).
    
    Need to update docs?
    
    - [x] `no-need-doc`
---
 .../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4ad42c0..f216902 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1556,7 +1556,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         checkArgument(state == State.Connected);
         if (log.isDebugEnabled()) {
             log.debug("[{}] redeliverUnacknowledged from consumer {}, consumerEpoch {}",
-                    remoteAddress, redeliver.getConsumerId(), redeliver.getConsumerEpoch());
+                    remoteAddress, redeliver.getConsumerId(),
+                    redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null);
         }
 
         CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());