You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/04/27 07:48:03 UTC

[pulsar] branch master updated: PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e78d9f1ac54 PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
e78d9f1ac54 is described below

commit e78d9f1ac546c150f4068c148e5ffe95c2ddf1f9
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Wed Apr 27 09:47:54 2022 +0200

    PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
    
    Sometimes the CommandSubscribe object has already been released and it triggers this error:
    17:36:40.676 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] WARN  org.apache.pulsar.broker.service.ServerCnx - [/192.168.1.111:50688][persistent://public/default/test-cb4105f6-f850-4bdf-9e79-66d4ac42658c][13b9ee68-4ee4-4845-b955-77420b8b6a29] Failed to create consumer: consumerId=0, refCnt: 0
    java.util.concurrent.CompletionException: io.netty.util.IllegalReferenceCountException: refCnt: 0
            at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) ~[?:?]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1081) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
            at org.apache.pulsar.broker.service.BrokerService.lambda(BrokerService.java:1419) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815) ~[?:?]
            at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799) ~[?:?]
            at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121) ~[?:?]
            at org.apache.pulsar.broker.service.BrokerService.openLedgerComplete(BrokerService.java:1405) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda(ManagedLedgerFactoryImpl.java:425) ~[managed-ledger-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) ~[?:?]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) ~[?:?]
            at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
            at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.initializeComplete(ManagedLedgerFactoryImpl.java:392) ~[managed-ledger-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:525) ~[managed-ledger-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:515) ~[managed-ledger-2.10.0.jar:2.10.0]
            at org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda(MetaStoreImpl.java:167) ~[managed-ledger-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714) [?:?]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) [?:?]
            at java.util.concurrent.CompletableFuture.run(CompletableFuture.java:478) [?:?]
            at org.apache.bookkeeper.common.util.OrderedExecutor.run(OrderedExecutor.java:203) [bookkeeper-common-4.14.4.jar:4.14.4]
            at java.util.concurrent.Executors.call(Executors.java:515) [?:?]
            at java.util.concurrent.FutureTask.run2168(FutureTask.java:264) [?:?]
            at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
            at java.util.concurrent.ScheduledThreadPoolExecutor.run(ScheduledThreadPoolExecutor.java:304) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628) [?:?]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.74.Final.jar:4.1.74.Final]
            at java.lang.Thread.run(Thread.java:829) [?:?]
    Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
            at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.PooledSlicedByteBuf.getBytes(PooledSlicedByteBuf.java:235) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246) ~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
            at org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250) ~[pulsar-common-2.10.0.jar:2.10.0]
            at org.apache.pulsar.common.api.proto.KeyValue.getKey(KeyValue.java:19) ~[pulsar-common-2.10.0.jar:2.10.0]
            at java.util.stream.Collectors.lambda(Collectors.java:1658) ~[?:?]
            at java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[?:?]
            at java.util.ArrayList.forEachRemaining(ArrayList.java:1511) ~[?:?]
            at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
            at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[?:?]
            at java.util.stream.ReduceOps.evaluateSequential(ReduceOps.java:913) ~[?:?]
            at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
            at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
            at org.apache.pulsar.broker.service.SubscriptionOption.getPropertiesMap(SubscriptionOption.java:57) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at org.apache.pulsar.broker.service.ServerCnx.lambda(ServerCnx.java:1047) ~[pulsar-broker-2.10.0.jar:2.10.0]
            at java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1072) ~[?:?]
            ... 28 more
---
 .../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 00246e40f7d..0963e181f35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1033,6 +1033,14 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                 boolean createTopicIfDoesNotExist = forceTopicCreation
                         && service.isAllowAutoTopicCreation(topicName.toString());
 
+                final long consumerEpoch;
+                if (subscribe.hasConsumerEpoch()) {
+                    consumerEpoch = subscribe.getConsumerEpoch();
+                } else {
+                    consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+                }
+                Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
+                        subscribe.getSubscriptionPropertiesList());
                 service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                         .thenCompose(optTopic -> {
                             if (!optTopic.isPresent()) {
@@ -1054,10 +1062,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                                 new SubscriptionNotFoundException(
                                                         "Subscription does not exist"));
                             }
-                            long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
-                            if (subscribe.hasConsumerEpoch()) {
-                                consumerEpoch = subscribe.getConsumerEpoch();
-                            }
+
                             SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
                                     .subscriptionName(subscriptionName)
                                     .consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1066,8 +1071,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                     .initialPosition(initialPosition)
                                     .startMessageRollbackDurationSec(startMessageRollbackDurationSec)
                                     .replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-                                    .subscriptionProperties(SubscriptionOption.getPropertiesMap(
-                                            subscribe.getSubscriptionPropertiesList()))
+                                    .subscriptionProperties(subscriptionProperties)
                                     .consumerEpoch(consumerEpoch)
                                     .build();
                             if (schema != null) {