You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/23 19:20:27 UTC
[pulsar] 02/02: Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e2778b26eff9134bcde1fbe2e84aaca86edfceb5
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Feb 23 12:03:07 2022 +0800
Fix Field 'consumer_epoch' is not set in ServerCnx (#14410)
### Motivation
Test case `SimpleProducerConsumerTest#testBlockUnackedConsumerRedeliverySpecificMessagesProduceWithPause` fails with debug log enabled.
Root cause is that `redeliver.getConsumerEpoch()` used in debug log without check if it's set.
```
2022-02-22T13:13:09,216+0800 [pulsar-io-6-1] WARN ServerCnx - [/127.0.0.1:64428] Got exception java.lang.IllegalStateException: Field 'consumer_epoch' is not set
at org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages.getConsumerEpoch(CommandRedeliverUnacknowledgedMessages.java:87)
at org.apache.pulsar.broker.service.ServerCnx.handleRedeliverUnacknowledged(ServerCnx.java:1559)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:274)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
```
### Modifications
Add check before get.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is a trivial rework / code cleanup without any test coverage.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
Check the box below and label this PR (if you have committer privilege).
Need to update docs?
- [x] `no-need-doc`
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4ad42c0..f216902 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1556,7 +1556,8 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
checkArgument(state == State.Connected);
if (log.isDebugEnabled()) {
log.debug("[{}] redeliverUnacknowledged from consumer {}, consumerEpoch {}",
- remoteAddress, redeliver.getConsumerId(), redeliver.getConsumerEpoch());
+ remoteAddress, redeliver.getConsumerId(),
+ redeliver.hasConsumerEpoch() ? redeliver.getConsumerEpoch() : null);
}
CompletableFuture<Consumer> consumerFuture = consumers.get(redeliver.getConsumerId());