You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/27 07:48:03 UTC
[pulsar] branch master updated: PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e78d9f1ac54 PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
e78d9f1ac54 is described below
commit e78d9f1ac546c150f4068c148e5ffe95c2ddf1f9
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Apr 27 09:47:54 2022 +0200
PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
Sometimes the CommandSubscribe object has already been released and it triggers this error:
17:36:40.676 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] WARN org.apache.pulsar.broker.service.ServerCnx - [/192.168.1.111:50688][persistent://public/default/test-cb4105f6-f850-4bdf-9e79-66d4ac42658c][13b9ee68-4ee4-4845-b955-77420b8b6a29] Failed to create consumer: consumerId=0, refCnt: 0
java.util.concurrent.CompletionException: io.netty.util.IllegalReferenceCountException: refCnt: 0
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1081) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.pulsar.broker.service.BrokerService.lambda(BrokerService.java:1419) ~[pulsar-broker-2.10.0.jar:2.10.0]
at java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815) ~[?:?]
at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799) ~[?:?]
at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121) ~[?:?]
at org.apache.pulsar.broker.service.BrokerService.openLedgerComplete(BrokerService.java:1405) ~[pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda(ManagedLedgerFactoryImpl.java:425) ~[managed-ledger-2.10.0.jar:2.10.0]
at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) ~[?:?]
at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.initializeComplete(ManagedLedgerFactoryImpl.java:392) ~[managed-ledger-2.10.0.jar:2.10.0]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:525) ~[managed-ledger-2.10.0.jar:2.10.0]
at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:515) ~[managed-ledger-2.10.0.jar:2.10.0]
at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda(MetaStoreImpl.java:167) ~[managed-ledger-2.10.0.jar:2.10.0]
at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) [?:?]
at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) [?:?]
at java.util.concurrent.CompletableFuture.run(CompletableFuture.java:478) [?:?]
at org.apache.bookkeeper.common.util.OrderedExecutor.run(OrderedExecutor.java:203) [bookkeeper-common-4.14.4.jar:4.14.4]
at java.util.concurrent.Executors.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run2168(FutureTask.java:264) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor.run(ScheduledThreadPoolExecutor.java:304) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.PooledSlicedByteBuf.getBytes(PooledSlicedByteBuf.java:235) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250) ~[pulsar-common-2.10.0.jar:2.10.0]
at org.apache.pulsar.common.api.proto.KeyValue.getKey(KeyValue.java:19) ~[pulsar-common-2.10.0.jar:2.10.0]
at java.util.stream.Collectors.lambda(Collectors.java:1658) ~[?:?]
at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[?:?]
at java.util.ArrayList.forEachRemaining(ArrayList.java:1511) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
at java.util.stream.ReduceOps.evaluateSequential(ReduceOps.java:913) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
at org.apache.pulsar.broker.service.SubscriptionOption.getPropertiesMap(SubscriptionOption.java:57) ~[pulsar-broker-2.10.0.jar:2.10.0]
at org.apache.pulsar.broker.service.ServerCnx.lambda(ServerCnx.java:1047) ~[pulsar-broker-2.10.0.jar:2.10.0]
at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1072) ~[?:?]
... 28 more
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 00246e40f7d..0963e181f35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1033,6 +1033,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());
+ final long consumerEpoch;
+ if (subscribe.hasConsumerEpoch()) {
+ consumerEpoch = subscribe.getConsumerEpoch();
+ } else {
+ consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+ }
+ Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
+ subscribe.getSubscriptionPropertiesList());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
@@ -1054,10 +1062,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
new SubscriptionNotFoundException(
"Subscription does not exist"));
}
- long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
- if (subscribe.hasConsumerEpoch()) {
- consumerEpoch = subscribe.getConsumerEpoch();
- }
+
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1066,8 +1071,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
- .subscriptionProperties(SubscriptionOption.getPropertiesMap(
- subscribe.getSubscriptionPropertiesList()))
+ .subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.build();
if (schema != null) {