You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/08/25 04:47:29 UTC

[pulsar] branch branch-2.8 updated (bd5f75f -> 3456269)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from bd5f75f  KeyShared dispatcher on non-persistent topics was not respecting consumer flow-control (#11692)
     new fe7dc86  [Broker Interceptor] Fix Pulsar didn't respond error messages when throw InterceptException (#11650)
     new 9789e3a  [Functions]Support protobuf schema for pulsar function (#11709)
     new 2fb2e59  [Broker] Remove subscription when closing Reader on non-persistent topics (#11731)
     new 1ed33f2  [Broker] Call .release() when discarding entry to prevent direct memory leak (#11748)
     new 7147d8a  [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive (#11691)
     new 9c2888f  Avoid duplicated disconnecting producer when after add entry failed. (#11741)
     new de813bf  [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754)
     new 9287e87  [Broker] Handle NPE when full key range isn't covered with active consumers (#11749)
     new 4490258  Upgrade bk version to resolve the BouncyCatle issue (#11759)
     new bce43cb  [pulsar-client] remove consumer reference from PulsarClient on subscription failure (#11758)
     new 3456269  Fix the topic in fenced state and can not recover. (#11737)

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/src/assemble/LICENSE.bin.txt   |  55 ++++---
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java |   9 +-
 pom.xml                                            |   2 +-
 .../org/apache/pulsar/broker/service/Producer.java |   9 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   2 +-
 .../nonpersistent/NonPersistentSubscription.java   |   9 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   4 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |   2 +
 ...istentStickyKeyDispatcherMultipleConsumers.java |   8 +-
 .../broker/service/persistent/PersistentTopic.java |  12 +-
 .../apache/pulsar/broker/web/ExceptionHandler.java |  57 +++++++
 .../pulsar/broker/web/PreInterceptFilter.java      |   8 +-
 .../org/apache/pulsar/broker/web/WebService.java   |   3 +-
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |   4 +-
 .../broker/intercept/InterceptFilterOutTest.java   |   4 +-
 .../pulsar/broker/service/PersistentTopicTest.java |  25 +++-
 .../pulsar/broker/web/ExceptionHandlerTest.java    |  63 ++++++++
 .../org/apache/pulsar/client/impl/ReaderTest.java  |  48 ++++++
 .../pulsar/client/admin/internal/BaseResource.java |   2 +-
 .../client/admin/internal/FunctionsImpl.java       |  16 +-
 .../pulsar/client/admin/internal/PackagesImpl.java |   2 +-
 .../pulsar/client/admin/internal/SinksImpl.java    |  10 +-
 .../pulsar/client/admin/internal/SourcesImpl.java  |  10 +-
 .../pulsar/client/admin/internal/WorkerImpl.java   |  10 +-
 .../admin/internal/http/AsyncHttpConnector.java    |  16 ++
 .../apache/pulsar/client/impl/ConsumerBase.java    | 165 ++++++++++++---------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  59 ++++----
 .../client/impl/MultiTopicsConsumerImpl.java       |  73 +++++----
 .../pulsar/client/impl/UnAckedMessageTracker.java  |   1 +
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   2 +-
 .../pulsar/client/impl/ClientTestFixtures.java     |  16 +-
 .../pulsar/client/impl/ConsumerImplTest.java       |  16 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  52 +++++--
 .../apache/pulsar/client/impl/ReaderImplTest.java  |  41 ++---
 pulsar-functions/runtime-all/pom.xml               |  17 +++
 .../functions/instance/JavaInstanceDepsTest.java   |   3 +
 pulsar-sql/presto-distribution/LICENSE             |  24 +--
 37 files changed, 591 insertions(+), 268 deletions(-)
 create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
 create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java

[pulsar] 05/11: [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive (#11691)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 7147d8a530dd8f5b7028ca3de55f041247af5bd7
Author: Jack Vanlightly <va...@gmail.com>
AuthorDate: Tue Aug 24 01:09:41 2021 +0200

    [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive (#11691)
    
    * Fixed block forever bug in Consumer.batchReceive
    
    Ensure that all poll() calls to pendingBatchReceives
    is done within the pinnedInternalExecutor to avoid a
    race condition where a peek and a subsequent poll get
    different pending receives.
    
    Moved the pinnedInternalExecutor into the ConsumerBase
    as both ConsumerImpl and MultiTopicsConsumerImpl require it.
    
    failingPendingReceive() now always submits its work to the
    internal executor returning a CompletableFuture and all callers
    treat it as an asynchronous operation.
    
    * Fix broken MultiTopicsConsumerImplTest
    
    Needed a real executor service to run the
    failPendingReceive() method.
    
    * Ensure all calls to messageReceived happen on internal executor
    
    * Readd missing return statement in ConsumerImpl.closeAsync()
    
    * Ensure correct usage of consumer internal executors
    
    Ensure that the externalPinnedExecutor is only called for user
    code and internalPinnedExecutor used for internal tasks.
    Some test refactoring to manage creation of executors.
    
    (cherry picked from commit bd942e1730780f89b78799ca768fc68277f7965b)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 165 ++++++++++++---------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  59 ++++----
 .../client/impl/MultiTopicsConsumerImpl.java       |  61 ++++----
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   2 +-
 .../pulsar/client/impl/ClientTestFixtures.java     |  16 +-
 .../pulsar/client/impl/ConsumerImplTest.java       |  16 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |  52 +++++--
 .../apache/pulsar/client/impl/ReaderImplTest.java  |  41 ++---
 8 files changed, 231 insertions(+), 181 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 9e3757b..f4002c2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -19,9 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.collect.Queues;
-
+import io.netty.util.Timeout;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -37,8 +36,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import io.netty.util.Timeout;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
@@ -73,7 +70,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
-    protected final ScheduledExecutorService pinnedExecutor;
+    protected final ScheduledExecutorService externalPinnedExecutor;
+    protected final ScheduledExecutorService internalPinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
@@ -81,7 +79,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final Schema<T> schema;
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
-    protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
+    protected final ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
     private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConsumerBase.class, "incomingMessagesSize");
     protected volatile long incomingMessagesSize = 0;
@@ -91,7 +89,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider executorProvider,
-                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
+                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
+                           ConsumerInterceptors interceptors) {
         super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = conf.getSubscriptionName();
@@ -104,8 +103,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
         this.unAckedChunkedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
         this.executorProvider = executorProvider;
-        this.pinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
+        this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
+        this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
+        this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
@@ -204,26 +205,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
-    protected CompletableFuture<Message<T>> peekPendingReceive() {
-        CompletableFuture<Message<T>> receivedFuture = null;
-        while (receivedFuture == null) {
-            receivedFuture = pendingReceives.peek();
-            if (receivedFuture == null) {
-                break;
-            }
-            // skip done futures (cancelling a future could mark it done)
-            if (receivedFuture.isDone()) {
-                CompletableFuture<Message<T>> removed = pendingReceives.poll();
-                if (removed != receivedFuture) {
-                    log.error("Bug! Removed future wasn't the expected one. expected={} removed={}", receivedFuture, removed);
-                }
-                receivedFuture = null;
-            }
-        }
-        return receivedFuture;
+    protected boolean hasNextPendingReceive() {
+        return !pendingReceives.isEmpty();
     }
 
-    protected CompletableFuture<Message<T>> pollPendingReceive() {
+    protected CompletableFuture<Message<T>> nextPendingReceive() {
         CompletableFuture<Message<T>> receivedFuture;
         while (true) {
             receivedFuture = pendingReceives.poll();
@@ -236,7 +222,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
-        getExecutor(message).execute(() -> {
+        getInternalExecutor(message).execute(() -> {
             if (!receivedFuture.complete(message)) {
                 log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}",
                         receivedFuture.isCancelled(), message);
@@ -244,7 +230,28 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         });
     }
 
-    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    private void failPendingReceives() {
         while (!pendingReceives.isEmpty()) {
             CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
             if (receiveFuture == null) {
@@ -252,21 +259,23 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             }
             if (!receiveFuture.isDone()) {
                 receiveFuture.completeExceptionally(
-                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing the consumers", topic, subscription)));
             }
         }
     }
 
-    protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
-        while (!pendingBatchReceives.isEmpty()) {
-            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+    private void failPendingBatchReceives() {
+        while (hasNextBatchReceive()) {
+            OpBatchReceive<T> opBatchReceive = nextBatchReceive();
             if (opBatchReceive == null || opBatchReceive.future == null) {
                 break;
             }
             if (!opBatchReceive.future.isDone()) {
                 opBatchReceive.future.completeExceptionally(
-                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing the consumers", topic, subscription)));
             }
         }
@@ -777,7 +786,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void notifyPendingBatchReceivedCallBack() {
-        OpBatchReceive<T> opBatchReceive = pollNextBatchReceive();
+        OpBatchReceive<T> opBatchReceive = nextBatchReceive();
         if (opBatchReceive == null) {
             return;
         }
@@ -790,31 +799,16 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
-    private OpBatchReceive<T> peekNextBatchReceive() {
-        OpBatchReceive<T> opBatchReceive = null;
-        while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.peek();
-            // no entry available
-            if (opBatchReceive == null) {
-                return null;
-            }
-            // remove entries where future is null or has been completed (cancel / timeout)
-            if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
-                OpBatchReceive<T> removed = pendingBatchReceives.poll();
-                if (removed != opBatchReceive) {
-                    log.error("Bug: Removed entry wasn't the expected one. expected={}, removed={}", opBatchReceive, removed);
-                }
-                opBatchReceive = null;
-            }
-        }
-        return opBatchReceive;
+    private boolean hasNextBatchReceive() {
+        return !pendingBatchReceives.isEmpty();
     }
 
 
-    private OpBatchReceive<T> pollNextBatchReceive() {
+    private OpBatchReceive<T> nextBatchReceive() {
         OpBatchReceive<T> opBatchReceive = null;
         while (opBatchReceive == null) {
             opBatchReceive = pendingBatchReceives.poll();
+
             // no entry available
             if (opBatchReceive == null) {
                 return null;
@@ -853,7 +847,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected abstract void messageProcessed(Message<?> msg);
 
 
-    private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
+    private void pendingBatchReceiveTask(Timeout timeout) {
+        internalPinnedExecutor.execute(() -> doPendingBatchReceiveTask(timeout));
+    }
+
+    private void doPendingBatchReceiveTask(Timeout timeout) {
         if (timeout.isCancelled()) {
             return;
         }
@@ -865,32 +863,44 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();
 
-            while (firstOpBatchReceive != null) {
+            while (opBatchReceive != null) {
                 // If there is at least one batch receive, calculate the diff between the batch receive timeout
                 // and the elapsed time since the operation was created.
                 long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt);
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt);
+
                 if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+                    completeOpBatchReceive(opBatchReceive);
+
+                    // remove the peeked item from the queue
+                    OpBatchReceive<T> removed = pendingBatchReceives.poll();
+
+                    if (removed != opBatchReceive) {
+                        // regression check, if this were to happen due to incorrect code changes in the future,
+                        // (allowing multi-threaded calls to poll()), then ensure that the polled item is completed
+                        // to avoid blocking user code
+
+                        log.error("Race condition in consumer {} (should not cause data loss). "
+                                + " Concurrent operations on pendingBatchReceives is not safe", this.consumerName);
+                        if (removed != null && !removed.future.isDone()) {
+                            completeOpBatchReceive(removed);
+                        }
                     }
-                    firstOpBatchReceive = peekNextBatchReceive();
+
                 } else {
                     // The diff is greater than zero, set the timeout to the diff value
                     timeToWaitMs = diff;
                     break;
                 }
+
+                opBatchReceive = pendingBatchReceives.peek();
             }
-            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, timeToWaitMs, TimeUnit.MILLISECONDS);
+            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
+                    timeToWaitMs, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -907,7 +917,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                         executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
                                 callMessageListener(msg));
                     } else {
-                        getExecutor(msg).execute(() -> {
+                        getExternalExecutor(msg).execute(() -> {
                             callMessageListener(msg);
                         });
                     }
@@ -957,7 +967,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected boolean hasPendingBatchReceive() {
-        return pendingBatchReceives != null && peekNextBatchReceive() != null;
+        return pendingBatchReceives != null && hasNextBatchReceive();
     }
 
     protected void increaseIncomingMessageSize(final Message<?> message) {
@@ -989,12 +999,21 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
-    private ExecutorService getExecutor(Message<T> msg) {
+    private ExecutorService getExternalExecutor(Message<T> msg) {
+        ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
+                : null;
+        ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null
+                ? receivedConsumer.externalPinnedExecutor
+                : externalPinnedExecutor;
+        return executor;
+    }
+
+    private ExecutorService getInternalExecutor(Message<T> msg) {
         ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
                 : null;
-        ExecutorService executor = receivedConsumer != null && receivedConsumer.pinnedExecutor != null
-                ? receivedConsumer.pinnedExecutor
-                : pinnedExecutor;
+        ExecutorService executor = receivedConsumer != null && receivedConsumer.internalPinnedExecutor != null
+                ? receivedConsumer.internalPinnedExecutor
+                : internalPinnedExecutor;
         return executor;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 829e62f..bc8f06b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -23,7 +23,6 @@ import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Queues;
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
 
 import io.netty.buffer.ByteBuf;
@@ -48,6 +47,7 @@ import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -82,7 +82,6 @@ import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.RetryMessageUtil;
-import org.apache.pulsar.client.util.RetryUtil;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
@@ -190,7 +189,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final boolean poolMessages;
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
-    private final ExecutorService internalPinnedExecutor;
+
+    private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
@@ -258,7 +258,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
         this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
         this.poolMessages = conf.isPoolMessages();
-        this.internalPinnedExecutor = client.getInternalExecutorService();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -472,9 +471,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
         internalPinnedExecutor.execute(() -> {
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -693,7 +689,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
-            failPendingReceive();
             clearReceiverQueue();
             return;
         }
@@ -898,9 +893,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
         if (getState() == State.Closing || getState() == State.Closed) {
             closeConsumerTasks();
-            return CompletableFuture.completedFuture(null);
+            failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
+            return closeFuture;
         }
 
         if (!isConnected()) {
@@ -909,7 +907,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
-            return CompletableFuture.completedFuture(null);
+            failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
+            return closeFuture;
         }
 
         stats.getStatTimeout().ifPresent(Timeout::cancel);
@@ -920,7 +919,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         long requestId = client.newRequestId();
 
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
         if (null == cnx) {
             cleanupAtClose(closeFuture, null);
@@ -944,15 +942,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         log.info("[{}] [{}] Closed consumer", topic, subscription);
         setState(State.Closed);
         closeConsumerTasks();
-        if (exception != null) {
-            closeFuture.completeExceptionally(exception);
-        } else {
-            closeFuture.complete(null);
-        }
         deregisterFromClientCnx();
         client.cleanupConsumer(this);
+
         // fail all pending-receive futures to notify application
-        failPendingReceive();
+        failPendingReceive().whenComplete((r, t) -> {
+            if (exception != null) {
+                closeFuture.completeExceptionally(exception);
+            } else {
+                closeFuture.complete(null);
+            }
+        });
     }
 
     private void closeConsumerTasks() {
@@ -967,21 +967,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         stats.getStatTimeout().ifPresent(Timeout::cancel);
     }
 
-    private void failPendingReceive() {
-        internalPinnedExecutor.execute(() -> {
-            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-                failPendingReceives(this.pendingReceives);
-                failPendingBatchReceives(this.pendingBatchReceives);
-            }
-        });
-    }
-
     void activeConsumerChanged(boolean isActive) {
         if (consumerEventListener == null) {
             return;
         }
 
-        pinnedExecutor.execute(() -> {
+        externalPinnedExecutor.execute(() -> {
             if (isActive) {
                 consumerEventListener.becameActive(this, partitionIndex);
             } else {
@@ -1080,7 +1071,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
                             Collections.singletonList(message));
                 }
-                if (peekPendingReceive() != null) {
+                if (hasNextPendingReceive()) {
                     notifyPendingReceivedCallback(message, null);
                 } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                     notifyPendingBatchReceivedCallBack();
@@ -1112,7 +1103,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            pinnedExecutor.scheduleAtFixedRate(() -> {
+            internalPinnedExecutor.scheduleAtFixedRate(() -> {
                         removeExpireIncompleteChunkedMessages();
                     }, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
                     TimeUnit.MILLISECONDS);
@@ -1200,19 +1191,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
+        final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
             return;
         }
 
         if (exception != null) {
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
+            internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
             return;
         }
 
         if (message == null) {
             IllegalStateException e = new IllegalStateException("received message can't be null");
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
+            internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
             return;
         }
 
@@ -1301,7 +1292,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     possibleToDeadLetter.add(message);
                 }
                 internalPinnedExecutor.execute(() -> {
-                    if (peekPendingReceive() != null) {
+                    if (hasNextPendingReceive()) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                         notifyPendingBatchReceivedCallBack();
@@ -2077,7 +2068,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return;
             }
 
-            pinnedExecutor.schedule(() -> {
+            internalPinnedExecutor.schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 63027a5..ce84376 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import org.apache.commons.lang3.tuple.Pair;
@@ -236,10 +235,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
                 topic, newConsumers.size(), getState());
         }
+
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
                 consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
-                receiveMessageFromConsumer(consumer);
+                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
             });
         }
     }
@@ -262,7 +262,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
                 // recursion and stack overflow
-                client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer));
+                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
             }
         });
     }
@@ -278,7 +278,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         }
 
         // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
-        CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
+        CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture != null) {
             unAckedMessageTracker.add(topicMessage.getMessageId());
             completePendingReceive(receivedFuture, topicMessage);
@@ -305,7 +305,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     break;
                 }
 
-                client.getInternalExecutorService().execute(() -> {
+                internalPinnedExecutor.execute(() -> {
                     receiveMessageFromConsumer(consumer);
                 });
             }
@@ -365,9 +365,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
         try {
             lock.writeLock().lock();
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -521,13 +518,17 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
+            .thenCompose((r) -> {
+                setState(State.Closed);
+                cleanupMultiConsumer();
+                log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
+                        topic, subscription, consumerName);
+                // fail all pending-receive futures to notify application
+                return failPendingReceive();
+            })
             .whenComplete((r, ex) -> {
                 if (ex == null) {
-                    setState(State.Closed);
-                    unAckedMessageTracker.close();
                     unsubscribeFuture.complete(null);
-                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
-                        topic, subscription, consumerName);
                 } else {
                     setState(State.Failed);
                     unsubscribeFuture.completeExceptionally(ex);
@@ -557,15 +558,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             .map(c -> c.closeAsync()).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
+            .thenCompose((r) -> {
+                setState(State.Closed);
+                cleanupMultiConsumer();
+                log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
+                // fail all pending-receive futures to notify application
+                return failPendingReceive();
+            })
             .whenComplete((r, ex) -> {
                 if (ex == null) {
-                    setState(State.Closed);
-                    unAckedMessageTracker.close();
                     closeFuture.complete(null);
-                    log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
-                    client.cleanupConsumer(this);
-                    // fail all pending-receive futures to notify application
-                    failPendingReceive();
                 } else {
                     setState(State.Failed);
                     closeFuture.completeExceptionally(ex);
@@ -577,11 +579,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         return closeFuture;
     }
 
-    private void failPendingReceive() {
-        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-            failPendingReceives(pendingReceives);
-            failPendingBatchReceives(pendingBatchReceives);
-        }
+    private void cleanupMultiConsumer() {
+        unAckedMessageTracker.close();
+        client.cleanupConsumer(this);
     }
 
     @Override
@@ -996,14 +996,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 }
 
                 // We have successfully created new consumers, so we can start receiving messages for them
-                startReceivingMessages(
-                    consumers.values().stream()
-                        .filter(consumer1 -> {
-                            String consumerTopicName = consumer1.getTopic();
-                            return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-                                    TopicName.get(topicName).getPartitionedTopicName());
-                        })
-                        .collect(Collectors.toList()));
+                startReceivingMessages(consumers.values().stream()
+                                .filter(consumer1 -> {
+                                    String consumerTopicName = consumer1.getTopic();
+                                    return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+                                            TopicName.get(topicName).getPartitionedTopicName());
+                                })
+                                .collect(Collectors.toList()));
 
                 subscribeResult.complete(null);
                 log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 98d5b48..f6302e3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -153,7 +153,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
         checkNotNull(listener, "listener can't be null");
         checkNotNull(message, "unqueued message can't be null");
 
-        pinnedExecutor.execute(() -> {
+        externalPinnedExecutor.execute(() -> {
             stats.updateNumMsgsReceived(message);
             try {
                 if (log.isDebugEnabled()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 0adb165..9e47e52 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -39,21 +39,29 @@ import static org.mockito.Mockito.*;
 class ClientTestFixtures {
     public static ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor();
 
-    static <T> PulsarClientImpl createPulsarClientMock() {
+//    static <T> PulsarClientImpl createPulsarClientMock() {
+//        return createPulsarClientMock(mock(ExecutorService.class));
+//    }
+
+    static <T> PulsarClientImpl createPulsarClientMock(ExecutorProvider executorProvider,
+                                                       ExecutorService internalExecutorService) {
         PulsarClientImpl clientMock = mock(PulsarClientImpl.class, Mockito.RETURNS_DEEP_STUBS);
 
         ClientConfigurationData clientConf = new ClientConfigurationData();
         when(clientMock.getConfiguration()).thenReturn(clientConf);
         when(clientMock.timer()).thenReturn(mock(Timer.class));
 
-        when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
+        when(clientMock.getInternalExecutorService()).thenReturn(internalExecutorService);
+        when(clientMock.externalExecutorProvider()).thenReturn(executorProvider);
         when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
 
         return clientMock;
     }
 
-    static <T> PulsarClientImpl createPulsarClientMockWithMockedClientCnx() {
-        return mockClientCnx(createPulsarClientMock());
+    static <T> PulsarClientImpl createPulsarClientMockWithMockedClientCnx(
+            ExecutorProvider executorProvider,
+            ExecutorService internalExecutorService) {
+        return mockClientCnx(createPulsarClientMock(executorProvider, internalExecutorService));
     }
 
     static PulsarClientImpl mockClientCnx(PulsarClientImpl clientMock) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 2702439..37c9e0c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -48,19 +48,17 @@ import org.testng.annotations.Test;
 
 public class ConsumerImplTest {
 
-
     private ExecutorProvider executorProvider;
+    private ExecutorService internalExecutor;
     private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
-    private ExecutorService executorService;
 
     @BeforeMethod(alwaysRun = true)
     public void setUp() {
         executorProvider = new ExecutorProvider(1, "ConsumerImplTest");
+        internalExecutor = Executors.newSingleThreadScheduledExecutor();
         consumerConf = new ConsumerConfigurationData<>();
-        PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
-        executorService = Executors.newSingleThreadExecutor();
-        when(client.getInternalExecutorService()).thenReturn(executorService);
+        PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor);
         ClientConfigurationData clientConf = client.getConfiguration();
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
@@ -80,9 +78,9 @@ public class ConsumerImplTest {
             executorProvider.shutdownNow();
             executorProvider = null;
         }
-        if (executorService != null) {
-            executorService.shutdownNow();
-            executorService = null;
+        if (internalExecutor != null) {
+            internalExecutor.shutdownNow();
+            internalExecutor = null;
         }
     }
 
@@ -172,7 +170,7 @@ public class ConsumerImplTest {
     public void testReceiveAsyncCanBeCancelled() {
         // given
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.peekPendingReceive(), future));
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(consumer.hasNextPendingReceive()));
         // when
         future.cancel(true);
         // then
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 745aa1c..157237c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -24,6 +24,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -35,7 +37,10 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
-import org.mockito.internal.verification.VerificationModeFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -60,6 +65,27 @@ import static org.testng.Assert.expectThrows;
  */
 public class MultiTopicsConsumerImplTest {
 
+    private ExecutorProvider executorProvider;
+    private ExecutorService internalExecutor;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() {
+        executorProvider = new ExecutorProvider(1, "MultiTopicsConsumerImplTest");
+        internalExecutor = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanUp() {
+        if (executorProvider != null) {
+            executorProvider.shutdownNow();
+            executorProvider = null;
+        }
+        if (internalExecutor != null) {
+            internalExecutor.shutdownNow();
+            internalExecutor = null;
+        }
+    }
+
     @Test
     public void testGetStats() throws Exception {
         String topicName = "test-stats";
@@ -112,18 +138,23 @@ public class MultiTopicsConsumerImplTest {
     }
 
     private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() {
-        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
+        return createMultiTopicsConsumer(consumerConfData);
+    }
+
+    private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer(
+            ConsumerConfigurationData<byte[]> consumerConfData) {
         int completionDelayMillis = 100;
         Schema<byte[]> schema = Schema.BYTES;
-        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createDelayedCompletedFuture(
                 new PartitionedTopicMetadata(), completionDelayMillis));
         when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(schema));
-        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
-            new CompletableFuture<>(), schema, null, true);
+        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(
+                clientMock, consumerConfData, executorProvider,
+                new CompletableFuture<>(), schema, null, true);
         return impl;
     }
 
@@ -132,7 +163,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        assertEquals(consumer.peekPendingReceive(), future);
+        assertTrue(consumer.hasNextPendingReceive());
         // when
         future.cancel(true);
         // then
@@ -153,18 +184,17 @@ public class MultiTopicsConsumerImplTest {
 
     @Test
     public void testConsumerCleanupOnSubscribeFailure() {
-        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
         consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c")));
         int completionDelayMillis = 10;
         Schema<byte[]> schema = Schema.BYTES;
-        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture(
                 new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis));
         CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>();
-        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
-                completeFuture, schema, null, true);
+        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData,
+                executorProvider, completeFuture, schema, null, true);
         // assert that we don't start in closed, then we move to closed and get an exception
         // indicating that closeAsync was called
         assertEquals(impl.getState(), HandlerState.State.Uninitialized);
@@ -191,7 +221,7 @@ public class MultiTopicsConsumerImplTest {
         @Cleanup("stop")
         Timer timer = new HashedWheelTimer();
 
-        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.timer()).thenReturn(timer);
         when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(null));
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index d0c4023..f6b8095 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -18,34 +18,35 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.Mockito.when;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-
 public class ReaderImplTest {
-    ReaderImpl<byte[]> reader;
-    private ExecutorService executorService;
+    private ReaderImpl<byte[]> reader;
+    private ExecutorProvider executorProvider;
+    private ExecutorService internalExecutor;
 
     @BeforeMethod
     void setupReader() {
-        PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
+        executorProvider = new ExecutorProvider(1, "ReaderImplTest");
+        internalExecutor = Executors.newSingleThreadScheduledExecutor();
+        PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(
+                executorProvider, internalExecutor);
         ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
         readerConfiguration.setTopicName("topicName");
-        executorService = Executors.newSingleThreadExecutor();
-        when(mockedClient.getInternalExecutorService()).thenReturn(executorService);
         CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
         reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(),
                 consumerFuture, Schema.BYTES);
@@ -53,9 +54,13 @@ public class ReaderImplTest {
 
     @AfterMethod
     public void clean() {
-        if (executorService != null) {
-            executorService.shutdownNow();
-            executorService = null;
+        if (executorProvider != null) {
+            executorProvider.shutdownNow();
+            executorProvider = null;
+        }
+        if (internalExecutor != null) {
+            internalExecutor.shutdownNow();
+            internalExecutor = null;
         }
     }
 
@@ -64,13 +69,13 @@ public class ReaderImplTest {
         // given
         CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
-            assertNotNull(reader.getConsumer().peekPendingReceive());
+            assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
         // when
         future.cancel(false);
 
         // then
-        assertNull(reader.getConsumer().peekPendingReceive());
+        assertFalse(reader.getConsumer().hasNextPendingReceive());
     }
 }

[pulsar] 04/11: [Broker] Call .release() when discarding entry to prevent direct memory leak (#11748)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1ed33f207dc12fa81f680c9eda212f8e9c3d0f4e
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Aug 23 21:15:51 2021 +0300

    [Broker] Call .release() when discarding entry to prevent direct memory leak (#11748)
    
    (cherry picked from commit 7906bb5990a918adfc47ed0ab52063caabac2801)
---
 .../service/persistent/PersistentDispatcherSingleActiveConsumer.java    | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 4bae5b0..c4bea81 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -175,7 +175,9 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 Entry entry = iterator.next();
                 byte[] key = peekStickyKey(entry.getDataBuffer());
                 Consumer consumer = stickyKeyConsumerSelector.select(key);
+                // Skip the entry if it's not for current active consumer.
                 if (consumer == null || currentConsumer != consumer) {
+                    entry.release();
                     iterator.remove();
                 }
             }

[pulsar] 09/11: Upgrade bk version to resolve the BouncyCatle issue (#11759)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 449025845f0fdbadd682a74247986124309ed709
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Tue Aug 24 17:52:58 2021 +0800

    Upgrade bk version to resolve the BouncyCatle issue (#11759)
    
    Fixes #10937
    
    *Motivation*
    
    The previours bk version was introduce an incompatible issue
    with BouncyCastle FIPS. BookKeeper already fix this and we
    should upgrade the version to resolve the issue.
    
    For more information: #10937
    
    *Modifications*
    
    Upgrade bk to 4.14.2
    
    (cherry picked from commit e9292b31eaad6f9765f518c05c3912501f9a2983)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 55 ++++++++++++------------
 pom.xml                                          |  2 +-
 pulsar-sql/presto-distribution/LICENSE           | 24 +++++------
 3 files changed, 40 insertions(+), 41 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 36f3e78..3d1299c 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -390,31 +390,31 @@ The Apache Software License, Version 2.0
     - org.apache.logging.log4j-log4j-1.2-api-2.14.0.jar
  * Java Native Access JNA -- net.java.dev.jna-jna-4.2.0.jar
  * BookKeeper
-    - org.apache.bookkeeper-bookkeeper-common-4.14.1.jar
-    - org.apache.bookkeeper-bookkeeper-common-allocator-4.14.1.jar
-    - org.apache.bookkeeper-bookkeeper-proto-4.14.1.jar
-    - org.apache.bookkeeper-bookkeeper-server-4.14.1.jar
-    - org.apache.bookkeeper-bookkeeper-tools-framework-4.14.1.jar
-    - org.apache.bookkeeper-circe-checksum-4.14.1.jar
-    - org.apache.bookkeeper-cpu-affinity-4.14.1.jar
-    - org.apache.bookkeeper-statelib-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-api-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-common-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-java-client-base-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-proto-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-server-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-service-api-4.14.1.jar
-    - org.apache.bookkeeper-stream-storage-service-impl-4.14.1.jar
-    - org.apache.bookkeeper.http-http-server-4.14.1.jar
-    - org.apache.bookkeeper.http-vertx-http-server-4.14.1.jar
-    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.14.1.jar
-    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.14.1.jar
-    - org.apache.distributedlog-distributedlog-common-4.14.1.jar
-    - org.apache.distributedlog-distributedlog-core-4.14.1-tests.jar
-    - org.apache.distributedlog-distributedlog-core-4.14.1.jar
-    - org.apache.distributedlog-distributedlog-protocol-4.14.1.jar
-    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.14.1.jar
+    - org.apache.bookkeeper-bookkeeper-common-4.14.2.jar
+    - org.apache.bookkeeper-bookkeeper-common-allocator-4.14.2.jar
+    - org.apache.bookkeeper-bookkeeper-proto-4.14.2.jar
+    - org.apache.bookkeeper-bookkeeper-server-4.14.2.jar
+    - org.apache.bookkeeper-bookkeeper-tools-framework-4.14.2.jar
+    - org.apache.bookkeeper-circe-checksum-4.14.2.jar
+    - org.apache.bookkeeper-cpu-affinity-4.14.2.jar
+    - org.apache.bookkeeper-statelib-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-api-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-common-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-java-client-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-java-client-base-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-proto-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-server-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-service-api-4.14.2.jar
+    - org.apache.bookkeeper-stream-storage-service-impl-4.14.2.jar
+    - org.apache.bookkeeper.http-http-server-4.14.2.jar
+    - org.apache.bookkeeper.http-vertx-http-server-4.14.2.jar
+    - org.apache.bookkeeper.stats-bookkeeper-stats-api-4.14.2.jar
+    - org.apache.bookkeeper.stats-prometheus-metrics-provider-4.14.2.jar
+    - org.apache.distributedlog-distributedlog-common-4.14.2.jar
+    - org.apache.distributedlog-distributedlog-core-4.14.2-tests.jar
+    - org.apache.distributedlog-distributedlog-core-4.14.2.jar
+    - org.apache.distributedlog-distributedlog-protocol-4.14.2.jar
+    - org.apache.bookkeeper.stats-codahale-metrics-provider-4.14.2.jar
   * Apache HTTP Client
     - org.apache.httpcomponents-httpclient-4.5.13.jar
     - org.apache.httpcomponents-httpcore-4.4.13.jar
@@ -446,9 +446,8 @@ The Apache Software License, Version 2.0
  * SnakeYaml -- org.yaml-snakeyaml-1.27.jar
  * RocksDB - org.rocksdb-rocksdbjni-6.10.2.jar
  * Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.5.1.jar
- * Apache Thrifth - org.apache.thrift-libthrift-0.12.0.jar
- * OkHttp and OkHttp3
-     - com.squareup.okhttp-okhttp-2.7.4.jar
+ * Apache Thrifth - org.apache.thrift-libthrift-0.14.2.jar
+ * OkHttp3
      - com.squareup.okhttp3-logging-interceptor-3.14.9.jar
      - com.squareup.okhttp3-okhttp-3.14.9.jar
  * Okio - com.squareup.okio-okio-1.13.0.jar
diff --git a/pom.xml b/pom.xml
index 0d00453..9277e97 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@ flexible messaging model and an intuitive client API.</description>
     <!-- apache commons -->
     <commons-compress.version>1.21</commons-compress.version>
 
-    <bookkeeper.version>4.14.1</bookkeeper.version>
+    <bookkeeper.version>4.14.2</bookkeeper.version>
     <zookeeper.version>3.6.3</zookeeper.version>
     <snappy.version>1.1.7</snappy.version> <!-- ZooKeeper server -->
     <dropwizardmetrics.version>3.2.5</dropwizardmetrics.version> <!-- ZooKeeper server -->
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 53ee344..d513849 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -412,18 +412,18 @@ The Apache Software License, Version 2.0
     - async-http-client-2.12.1.jar
     - async-http-client-netty-utils-2.12.1.jar
   * Apache Bookkeeper
-    - bookkeeper-common-4.14.1.jar
-    - bookkeeper-common-allocator-4.14.1.jar
-    - bookkeeper-proto-4.14.1.jar
-    - bookkeeper-server-4.14.1.jar
-    - bookkeeper-stats-api-4.14.1.jar
-    - bookkeeper-tools-framework-4.14.1.jar
-    - circe-checksum-4.14.1.jar
-    - codahale-metrics-provider-4.14.1.jar
-    - cpu-affinity-4.14.1.jar
-    - http-server-4.14.1.jar
-    - prometheus-metrics-provider-4.14.1.jar
-    - codahale-metrics-provider-4.14.1.jar
+    - bookkeeper-common-4.14.2.jar
+    - bookkeeper-common-allocator-4.14.2.jar
+    - bookkeeper-proto-4.14.2.jar
+    - bookkeeper-server-4.14.2.jar
+    - bookkeeper-stats-api-4.14.2.jar
+    - bookkeeper-tools-framework-4.14.2.jar
+    - circe-checksum-4.14.2.jar
+    - codahale-metrics-provider-4.14.2.jar
+    - cpu-affinity-4.14.2.jar
+    - http-server-4.14.2.jar
+    - prometheus-metrics-provider-4.14.2.jar
+    - codahale-metrics-provider-4.14.2.jar
   * Apache Commons
     - commons-cli-1.2.jar
     - commons-codec-1.15.jar

[pulsar] 02/11: [Functions]Support protobuf schema for pulsar function (#11709)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9789e3a42ae115f4e99a22f8b2ff361d091de475
Author: Guangning E <gu...@apache.org>
AuthorDate: Fri Aug 20 09:07:19 2021 +0800

    [Functions]Support protobuf schema for pulsar function (#11709)
    
    ### Motivation
    
    Some users have encountered the following exception when using the protobuf schema, so add the relevant dependencies to the instance:
    
    ```
    ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Sink open produced uncaught exception:
    java.lang.IllegalArgumentException: com.google.protobuf.GeneratedMessageV3 is not assignable from bold.proto.PersonOuterClass$Person
    	at org.apache.pulsar.client.impl.schema.ProtobufSchema.of(ProtobufSchema.java:110) ~
    ```
    
    ### Modifications
    
    * Add the relevant dependencies to the instance
    
    ### Verifying this change
    
    - [x] Make sure that the change passes the CI checks.
    
    (cherry picked from commit f893c08ae5e356115cdaaa931982807174929f73)
---
 pulsar-functions/runtime-all/pom.xml                    | 17 +++++++++++++++++
 .../pulsar/functions/instance/JavaInstanceDepsTest.java |  3 +++
 2 files changed, 20 insertions(+)

diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index d50b0cb..ed5ff8c 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -79,6 +79,23 @@
       <version>${jackson.databind.version}</version>
     </dependency>
 
+    <!--In order to support protobuf schema, this dependency needs to be added-->
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf3.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>${protobuf3.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>${gson.version}</version>
+    </dependency>
+
     <!-- logging -->
 
     <dependency>
diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
index 859be4e..6f4ee68 100644
--- a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
+++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
@@ -72,6 +72,9 @@ public class JavaInstanceDepsTest {
                         && !name.startsWith("org/apache/avro")
                         && !name.startsWith("com/fasterxml/jackson")
                         && !name.startsWith("org/apache/commons/compress")
+                        && !name.startsWith("com/google")
+                        && !name.startsWith("org/checkerframework")
+                        && !name.startsWith("javax/annotation")
                         && !name.startsWith("org/apache/logging/slf4j")
                         && !name.startsWith("org/apache/logging/log4j")) {
                     notAllowedClasses.add(name);

[pulsar] 08/11: [Broker] Handle NPE when full key range isn't covered with active consumers (#11749)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9287e873df8f13f9ac88846eaa49dd411a8ebc4a
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Aug 23 21:15:14 2021 +0300

    [Broker] Handle NPE when full key range isn't covered with active consumers (#11749)
    
    (cherry picked from commit 8027ab4e8763486de16d4b2f850b234b70a16b27)
---
 .../PersistentStickyKeyDispatcherMultipleConsumers.java           | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index c23b360..d4d64e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -175,8 +175,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         for (Entry entry : entries) {
             int stickyKeyHash = getStickyKeyHash(entry);
             Consumer c = selector.select(stickyKeyHash);
-            groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
-            consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
+            if (c != null) {
+                groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
+                consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
+            } else {
+                entry.release();
+            }
         }
 
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());

[pulsar] 01/11: [Broker Interceptor] Fix Pulsar didn't respond error messages when throw InterceptException (#11650)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fe7dc8652162e15ab9a5a55164c46450c7f50fc9
Author: GuoJiwei <te...@apache.org>
AuthorDate: Mon Aug 16 10:11:20 2021 +0800

    [Broker Interceptor] Fix Pulsar didn't respond error messages when throw InterceptException (#11650)
    
    (cherry picked from commit 4793ff7bc0aa5ea442878efc85e0bac011cba7ee)
---
 .../apache/pulsar/broker/web/ExceptionHandler.java | 57 ++++++++++++++++++++
 .../pulsar/broker/web/PreInterceptFilter.java      |  8 +--
 .../org/apache/pulsar/broker/web/WebService.java   |  3 +-
 .../pulsar/broker/admin/AdminApiSchemaTest.java    |  4 +-
 .../broker/intercept/InterceptFilterOutTest.java   |  4 +-
 .../pulsar/broker/web/ExceptionHandlerTest.java    | 63 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/BaseResource.java |  2 +-
 .../client/admin/internal/FunctionsImpl.java       | 16 +++---
 .../pulsar/client/admin/internal/PackagesImpl.java |  2 +-
 .../pulsar/client/admin/internal/SinksImpl.java    | 10 ++--
 .../pulsar/client/admin/internal/SourcesImpl.java  | 10 ++--
 .../pulsar/client/admin/internal/WorkerImpl.java   | 10 ++--
 .../admin/internal/http/AsyncHttpConnector.java    | 16 ++++++
 13 files changed, 173 insertions(+), 32 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
new file mode 100644
index 0000000..8b200a8
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/ExceptionHandler.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.common.intercept.InterceptException;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.http.MetaData;
+
+/**
+ *  Exception handler for handle exception.
+ */
+public class ExceptionHandler {
+
+    public void handle(ServletResponse response, Exception ex) throws IOException {
+        if (ex instanceof InterceptException) {
+            String reason = ex.getMessage();
+            byte[] content = reason.getBytes(StandardCharsets.UTF_8);
+            MetaData.Response info = new MetaData.Response();
+            info.setHttpVersion(HttpVersion.HTTP_1_1);
+            info.setReason(reason);
+            info.setStatus(((InterceptException) ex).getErrorCode());
+            info.setContentLength(content.length);
+            if (response instanceof org.eclipse.jetty.server.Response) {
+                ((org.eclipse.jetty.server.Response) response).getHttpChannel().sendResponse(info,
+                        ByteBuffer.wrap(content), true);
+            } else {
+                ((HttpServletResponse) response).sendError(((InterceptException) ex).getErrorCode(),
+                        ex.getMessage());
+            }
+        } else {
+            ((HttpServletResponse) response).sendError(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+                    ex.getMessage());
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
index 201d9ad..e4e7bbc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PreInterceptFilter.java
@@ -26,7 +26,6 @@ import javax.servlet.ServletException;
 import javax.servlet.ServletRequest;
 import javax.servlet.ServletResponse;
 import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -38,8 +37,11 @@ public class PreInterceptFilter implements Filter {
 
     private final BrokerInterceptor interceptor;
 
-    public PreInterceptFilter(BrokerInterceptor interceptor) {
+    private final ExceptionHandler exceptionHandler;
+
+    public PreInterceptFilter(BrokerInterceptor interceptor, ExceptionHandler exceptionHandler) {
         this.interceptor = interceptor;
+        this.exceptionHandler = exceptionHandler;
     }
 
     @Override
@@ -67,7 +69,7 @@ public class PreInterceptFilter implements Filter {
             interceptor.onWebserviceRequest(requestWrapper);
             filterChain.doFilter(requestWrapper, servletResponse);
         } catch (InterceptException e) {
-            ((HttpServletResponse) servletResponse).sendError(e.getErrorCode(), e.getMessage());
+            exceptionHandler.handle(servletResponse, e);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
index 6efb7c6..4d9ff83 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java
@@ -162,8 +162,9 @@ public class WebService implements AutoCloseable {
 
         if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
                 || !pulsar.getConfig().isDisableBrokerInterceptors()) {
+            ExceptionHandler handler = new ExceptionHandler();
             // Enable PreInterceptFilter only when interceptors are enabled
-            context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())),
+            context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)),
                     MATCH_ALL, EnumSet.allOf(DispatcherType.class));
             context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)),
                     MATCH_ALL, EnumSet.allOf(DispatcherType.class));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
index 38d0ab7..dce60e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaTest.java
@@ -193,7 +193,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
             admin.schemas().createSchema(topicName, foo1SchemaInfo);
             fail("Should have failed");
         } catch (PulsarAdminException.ConflictException e) {
-            assertTrue(e.getMessage().contains("HTTP 409 Conflict"));
+            assertTrue(e.getMessage().contains("HTTP 409"));
         }
 
         namespace = "schematest/testnotfound";
@@ -203,7 +203,7 @@ public class AdminApiSchemaTest extends MockedPulsarServiceBaseTest {
             admin.schemas().createSchema(topicName, fooSchemaInfo);
             fail("Should have failed");
         } catch (PulsarAdminException.NotFoundException e) {
-            assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
+            assertTrue(e.getMessage().contains("HTTP 404"));
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
index 1219420..3d8c473 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/InterceptFilterOutTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.intercept;
 
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.web.ExceptionHandler;
 import org.apache.pulsar.broker.web.PreInterceptFilter;
 import org.apache.pulsar.broker.web.ProcessHandlerFilter;
 import org.apache.pulsar.broker.web.ResponseHandlerFilter;
@@ -61,7 +62,8 @@ public class InterceptFilterOutTest {
     @Test
     public void testFilterOutForPreInterceptFilter() throws Exception {
         CounterBrokerInterceptor interceptor = new CounterBrokerInterceptor();
-        PreInterceptFilter filter = new PreInterceptFilter(interceptor);
+        ExceptionHandler handler = new ExceptionHandler();
+        PreInterceptFilter filter = new PreInterceptFilter(interceptor, handler);
 
         HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
         HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java
new file mode 100644
index 0000000..ed70869
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/ExceptionHandlerTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import lombok.SneakyThrows;
+import org.apache.pulsar.common.intercept.InterceptException;
+import org.eclipse.jetty.server.HttpChannel;
+import org.eclipse.jetty.server.Response;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+
+import javax.servlet.http.HttpServletResponse;
+
+import static org.eclipse.jetty.http.HttpStatus.INTERNAL_SERVER_ERROR_500;
+import static org.eclipse.jetty.http.HttpStatus.PRECONDITION_FAILED_412;
+
+/**
+ * Unit test for ExceptionHandler.
+ */
+@Test(groups = "broker")
+public class ExceptionHandlerTest {
+
+    @Test
+    @SneakyThrows
+    public void testHandle() {
+        String restriction = "Reach the max tenants [5] restriction";
+        String internal = "internal exception";
+        String illegal = "illegal argument exception ";
+        ExceptionHandler handler = new ExceptionHandler();
+        HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
+        handler.handle(response, new InterceptException(PRECONDITION_FAILED_412, restriction));
+        Mockito.verify(response).sendError(PRECONDITION_FAILED_412, restriction);
+
+        handler.handle(response, new InterceptException(INTERNAL_SERVER_ERROR_500, internal));
+        Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, internal);
+
+        handler.handle(response, new IllegalArgumentException(illegal));
+        Mockito.verify(response).sendError(INTERNAL_SERVER_ERROR_500, illegal);
+
+        Response response2 = Mockito.mock(Response.class);
+        HttpChannel httpChannel = Mockito.mock(HttpChannel.class);
+        Mockito.when(response2.getHttpChannel()).thenReturn(httpChannel);
+        handler.handle(response2, new InterceptException(PRECONDITION_FAILED_412, restriction));
+        Mockito.verify(httpChannel).sendResponse(Mockito.any(), Mockito.any(), Mockito.anyBoolean());
+    }
+
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
index ae9b0c3..6838fd8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java
@@ -249,7 +249,7 @@ public abstract class BaseResource {
     }
 
     public PulsarAdminException getApiException(Response response) {
-        if (response.getStatusInfo().equals(Response.Status.OK)) {
+        if (response.getStatus() == Response.Status.OK.getStatusCode()) {
             return null;
         }
         try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 806ef79..a53da9d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -103,7 +103,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             List<String> functions = response.readEntity(new GenericType<List<String>>() {});
@@ -142,7 +142,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(FunctionConfig.class));
@@ -181,7 +181,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(FunctionStatus.class));
@@ -224,7 +224,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -267,7 +267,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(FunctionInstanceStatsDataImpl.class));
@@ -307,7 +307,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(FunctionStatsImpl.class));
@@ -920,7 +920,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
     public List<ConnectorDefinition> getConnectorsList() throws PulsarAdminException {
         try {
             Response response = request(functions.path("connectors")).get();
-            if (!response.getStatusInfo().equals(Response.Status.OK)) {
+            if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                 throw getApiException(response);
             }
             return response.readEntity(new GenericType<List<ConnectorDefinition>>() {
@@ -977,7 +977,7 @@ public class FunctionsImpl extends ComponentResource implements Functions {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(FunctionState.class));
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
index ae84649..4c7fc4c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PackagesImpl.java
@@ -176,7 +176,7 @@ public class PackagesImpl extends ComponentResource implements Packages {
         asyncGetRequest(webTarget, new InvocationCallback<Response>(){
             @Override
             public void completed(Response response) {
-                if (response.getStatusInfo().equals(Response.Status.OK)) {
+                if (response.getStatus() == Response.Status.OK.getStatusCode()) {
                     try (InputStream inputStream = response.readEntity(InputStream.class)) {
                         Path destinyPath = Paths.get(path);
                         if (destinyPath.getParent() != null) {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
index 6a31823..215c893 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinksImpl.java
@@ -89,7 +89,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(new GenericType<List<String>>() {}));
@@ -129,7 +129,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(SinkConfig.class));
@@ -170,7 +170,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(SinkStatus.class));
@@ -213,7 +213,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -637,7 +637,7 @@ public class SinksImpl extends ComponentResource implements Sinks, Sink {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
index 5139202..f52845a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourcesImpl.java
@@ -85,7 +85,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(new GenericType<List<String>>() {}));
@@ -122,7 +122,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(SourceConfig.class));
@@ -160,7 +160,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(SourceStatus.class));
@@ -201,7 +201,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -586,7 +586,7 @@ public class SourcesImpl extends ComponentResource implements Sources, Source {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(getApiException(response));
                         } else {
                             future.complete(response.readEntity(
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
index 0b4d9da..a8ebcf1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/WorkerImpl.java
@@ -72,7 +72,7 @@ public class WorkerImpl extends BaseResource implements Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new ClientErrorException(response));
                         } else {
                             List<WorkerFunctionInstanceStats> metricsList =
@@ -111,7 +111,7 @@ public class WorkerImpl extends BaseResource implements Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(
@@ -150,7 +150,7 @@ public class WorkerImpl extends BaseResource implements Worker {
 
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(new GenericType<List<WorkerInfo>>() {}));
@@ -187,7 +187,7 @@ public class WorkerImpl extends BaseResource implements Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(new GenericType<WorkerInfo>(){}));
@@ -224,7 +224,7 @@ public class WorkerImpl extends BaseResource implements Worker {
                 new InvocationCallback<Response>() {
                     @Override
                     public void completed(Response response) {
-                        if (!response.getStatusInfo().equals(Response.Status.OK)) {
+                        if (response.getStatus() != Response.Status.OK.getStatusCode()) {
                             future.completeExceptionally(new ClientErrorException(response));
                         } else {
                             future.complete(response.readEntity(
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index bf8492c..3e17a38 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -205,6 +205,22 @@ public class AsyncHttpConnector implements Connector {
             } else {
                 ClientResponse jerseyResponse =
                         new ClientResponse(Status.fromStatusCode(response.getStatusCode()), jerseyRequest);
+                jerseyResponse.setStatusInfo(new javax.ws.rs.core.Response.StatusType() {
+                    @Override
+                    public int getStatusCode() {
+                        return response.getStatusCode();
+                    }
+
+                    @Override
+                    public Status.Family getFamily() {
+                        return Status.Family.familyOf(response.getStatusCode());
+                    }
+
+                    @Override
+                    public String getReasonPhrase() {
+                        return response.getStatusText();
+                    }
+                });
                 response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
                 if (response.hasResponseBody()) {
                     jerseyResponse.setEntityStream(response.getResponseBodyAsStream());

[pulsar] 03/11: [Broker] Remove subscription when closing Reader on non-persistent topics (#11731)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2fb2e59cde34b00b28a946cab6625be8e93fe729
Author: ran <ga...@126.com>
AuthorDate: Sat Aug 21 02:51:41 2021 +0800

    [Broker] Remove subscription when closing Reader on non-persistent topics (#11731)
    
    * Remove the subscription from the topic when closing Reader subscription.
    
    * remove useless code
    
    (cherry picked from commit 5ac38f1434dd9822daa4bcd1ed2f3c5e4580463b)
---
 .../nonpersistent/NonPersistentSubscription.java   |  9 +++-
 .../service/nonpersistent/NonPersistentTopic.java  |  4 +-
 .../org/apache/pulsar/client/impl/ReaderTest.java  | 48 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index c605f23..a400fae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -72,13 +72,17 @@ public class NonPersistentSubscription implements Subscription {
     private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
     private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
 
-    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName) {
+    // If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
+    private final boolean isDurable;
+
+    public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionName, boolean isDurable) {
         this.topic = topic;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
         IS_FENCED_UPDATER.set(this, FALSE);
         this.lastActive = System.currentTimeMillis();
+        this.isDurable = isDurable;
     }
 
     @Override
@@ -200,6 +204,9 @@ public class NonPersistentSubscription implements Subscription {
         ConsumerStatsImpl stats = consumer.getStats();
         bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
         msgOutFromRemovedConsumer.add(stats.msgOutCounter);
+        if (!isDurable) {
+            topic.unsubscribe(subName);
+        }
 
         // invalid consumer remove will throw an exception
         // decrement usage is triggered only for valid consumer close
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 37680d4..7ae87f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -270,7 +270,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         }
 
         NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
-                name -> new NonPersistentSubscription(this, subscriptionName));
+                name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
         Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
                 cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta, MessageId.latest);
         addConsumerToSubscription(subscription, consumer).thenRun(() -> {
@@ -319,7 +319,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
     @Override
     public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
             boolean replicateSubscriptionState) {
-        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
+        return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
index 4a2466f..2052646 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java
@@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -56,6 +57,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.apache.pulsar.schema.Schemas;
 import org.awaitility.Awaitility;
@@ -554,4 +556,50 @@ public class ReaderTest extends MockedPulsarServiceBaseTest {
         latch.await();
         Assert.assertEquals(received.size(), 1);
     }
+
+    @Test(timeOut = 1000 * 10)
+    public void removeNonPersistentTopicReaderTest() throws Exception {
+        final String topic = "non-persistent://my-property/my-ns/non-topic";
+
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        Reader<byte[]> reader2 = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        Awaitility.await()
+                .pollDelay(3, TimeUnit.SECONDS)
+                .until(() -> {
+                    TopicStats topicStats = admin.topics().getStats(topic);
+                    System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+                    return topicStats.getSubscriptions().size() == 2;
+                });
+
+        reader.close();
+        reader2.close();
+
+        Awaitility.await().until(() -> {
+            TopicStats topicStats = admin.topics().getStats(topic);
+            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+            return topicStats.getSubscriptions().size() == 0;
+        });
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+        consumer.close();
+
+        Awaitility.await()
+                .pollDelay(3, TimeUnit.SECONDS)
+                .until(() -> {
+            TopicStats topicStats = admin.topics().getStats(topic);
+            System.out.println("subscriptions size: " + topicStats.getSubscriptions().size());
+            return topicStats.getSubscriptions().size() == 1;
+        });
+    }
+
 }

[pulsar] 10/11: [pulsar-client] remove consumer reference from PulsarClient on subscription failure (#11758)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bce43cbb4dbb4826c9c0ee9664cdc04ef6f291c5
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Aug 24 06:01:35 2021 -0700

    [pulsar-client] remove consumer reference from PulsarClient on subscription failure (#11758)
    
    (cherry picked from commit c2bd23d1676e17135eb9a5044a939069c89588e9)
---
 .../main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index c9e2067..727f2d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -877,6 +877,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             .exceptionally(e -> {
                 log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}",
                     topicName, numPartitions,  e);
+                consumer.cleanupMultiConsumer();
                 subscribeFuture.completeExceptionally(
                     PulsarClientException.wrap(((Throwable) e).getCause(), String.format("Failed to subscribe %s with %d partitions", topicName, numPartitions)));
                 return null;

[pulsar] 06/11: Avoid duplicated disconnecting producer when after add entry failed. (#11741)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9c2888f7df738f579a9f295b79297ebe325c85ae
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue Aug 24 02:16:00 2021 +0800

    Avoid duplicated disconnecting producer when after add entry failed. (#11741)
    
    Currently, if encounter the add entry failure, will call producer.disconnect() multiple times during the disconnecting the producer
    which will add many disconnect producer tasks to the EventLoop.
    
    1. Added isDisconnecting state for the producer, if the producer in isDisconnecting state, skip the disconnect operation
    2. Create new future list only the topic have producers to reduce the heap allocation
    
    Added test to cover disconnecting the producer multiple times, but the EventLoop only execute one time.
    
    (cherry picked from commit 49c0796e8279442cc9162387a9db3e24415f9bbc)
---
 .../org/apache/pulsar/broker/service/Producer.java |  9 +++++++-
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../broker/service/persistent/PersistentTopic.java | 12 ++++++++---
 .../pulsar/broker/service/PersistentTopicTest.java | 25 ++++++++++++++++++++--
 4 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index 15076f9..8c35e66 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -34,6 +34,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
 import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
@@ -90,6 +91,7 @@ public class Producer {
 
     private final SchemaVersion schemaVersion;
     private final String clientAddress; // IP address only, no port number included
+    private final AtomicBoolean isDisconnecting = new AtomicBoolean(false);
 
     public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
             boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
@@ -552,6 +554,7 @@ public class Producer {
             log.debug("Removed producer: {}", this);
         }
         closeFuture.complete(null);
+        isDisconnecting.set(false);
     }
 
     /**
@@ -561,7 +564,7 @@ public class Producer {
      * @return Completable future indicating completion of producer close
      */
     public CompletableFuture<Void> disconnect() {
-        if (!closeFuture.isDone()) {
+        if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) {
             log.info("Disconnecting producer: {}", this);
             cnx.execute(() -> {
                 cnx.closeProducer(this);
@@ -669,6 +672,10 @@ public class Producer {
         return clientAddress;
     }
 
+    public boolean isDisconnecting() {
+        return isDisconnecting.get();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(Producer.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 511d5a2..6a5ee25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2572,7 +2572,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     public void execute(Runnable runnable) {
-        ctx.channel().eventLoop().execute(runnable);
+        ctx().channel().eventLoop().execute(runnable);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 285dff4..2808021 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -503,9 +503,15 @@ public class PersistentTopic extends AbstractTopic
             // fence topic when failed to write a message to BK
             fence();
             // close all producers
-            List<CompletableFuture<Void>> futures = Lists.newArrayList();
-            producers.values().forEach(producer -> futures.add(producer.disconnect()));
-            FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
+            CompletableFuture<Void> disconnectProducersFuture;
+            if (producers.size() > 0) {
+                List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                producers.forEach((__, producer) -> futures.add(producer.disconnect()));
+                disconnectProducersFuture = FutureUtil.waitForAll(futures);
+            } else {
+                disconnectProducersFuture = CompletableFuture.completedFuture(null);
+            }
+            disconnectProducersFuture.handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
                 decrementPendingWriteOpsAndCheck();
                 return null;
             });
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index b53404a..2998345 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -56,7 +56,6 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -64,6 +63,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultEventLoop;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import lombok.Cleanup;
@@ -93,7 +95,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
@@ -216,6 +218,11 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
         doReturn(new PulsarCommandSenderImpl(null, serverCnx))
                 .when(serverCnx).getCommandSender();
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        doReturn(spy(DefaultEventLoop.class)).when(channel).eventLoop();
+        doReturn(channel).when(ctx).channel();
+        doReturn(ctx).when(serverCnx).ctx();
 
         NamespaceService nsSvc = mock(NamespaceService.class);
         NamespaceBundle bundle = mock(NamespaceBundle.class);
@@ -2166,6 +2173,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         f2.get();
     }
 
+    @Test
+    public void testDisconnectProducer() throws Exception {
+        PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
+        String role = "appid1";
+        Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
+                role, false, null, SchemaVersion.Latest, 0, false,
+                ProducerAccessMode.Shared, Optional.empty());
+        assertFalse(producer.isDisconnecting());
+        // Disconnect the producer multiple times.
+        producer.disconnect();
+        producer.disconnect();
+        verify(serverCnx).execute(any());
+    };
+
     private ByteBuf getMessageWithMetadata(byte[] data) {
         MessageMetadata messageData = new MessageMetadata()
                 .setPublishTime(System.currentTimeMillis())

[pulsar] 07/11: [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit de813bf35afc7038465ecdf61921bf554293aa1b
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Mon Aug 23 19:34:17 2021 -0700

    [pulsar-client] clean up MultiTopicsConsumerImpl reference on consumer creation failure (#11754)
    
    (cherry picked from commit f154de74830ca2eaca67d01322fcb9a557d649ce)
---
 .../pulsar/client/impl/MultiTopicsConsumerImpl.java       | 15 ++++++++++++---
 .../apache/pulsar/client/impl/UnAckedMessageTracker.java  |  1 +
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index ce84376..c9e2067 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -103,7 +103,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private final ConsumerStatsRecorder stats;
-    private final UnAckedMessageTracker unAckedMessageTracker;
+    private UnAckedMessageTracker unAckedMessageTracker;
     private final ConsumerConfigurationData<T> internalConfig;
 
     private volatile BatchMessageIdImpl startMessageId = null;
@@ -543,7 +543,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
-            unAckedMessageTracker.close();
+            if (unAckedMessageTracker != null) {
+                unAckedMessageTracker.close();
+            }
             return CompletableFuture.completedFuture(null);
         }
         setState(State.Closing);
@@ -580,7 +582,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     private void cleanupMultiConsumer() {
-        unAckedMessageTracker.close();
+        if (unAckedMessageTracker != null) {
+            unAckedMessageTracker.close();
+            unAckedMessageTracker = null;
+        }
+        if (partitionsAutoUpdateTimeout != null) {
+            partitionsAutoUpdateTimeout.cancel();
+            partitionsAutoUpdateTimeout = null;
+        }
         client.cleanupConsumer(this);
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index a244366..db616f2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -248,6 +248,7 @@ public class UnAckedMessageTracker implements Closeable {
         try {
             if (timeout != null && !timeout.isCancelled()) {
                 timeout.cancel();
+                timeout = null;
             }
             this.clear();
         } finally {

[pulsar] 11/11: Fix the topic in fenced state and can not recover. (#11737)

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 345626947e4583d58cc0ceb9f6491a7339d6aadf
Author: lipenghui <pe...@apache.org>
AuthorDate: Wed Aug 25 11:27:12 2021 +0800

    Fix the topic in fenced state and can not recover. (#11737)
    
    * Fix the topic in fenced state and can not recover.
    
    Here is the log when the issue happens. The producer continues to reconnect to the broker, but the fenced state of the topic is always true.
    ```
    19:01:42.351 [pulsar-io-4-1] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052][persistent://public/default/test-8] Creating producer. producerId=8
    19:01:42.352 [Thread-174681] INFO  org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052] persistent://public/default/test-8 configured with schema false
    19:01:42.352 [Thread-174681] WARN  org.apache.pulsar.broker.service.AbstractTopic - [persistent://public/default/test-8] Attempting to add producer to a fenced topic
    19:01:42.352 [Thread-174681] ERROR org.apache.pulsar.broker.service.ServerCnx - [/10.24.34.151:48052] Failed to add producer to topic persistent://public/default/test-8: producerId=8, org.apache.pulsar.broker.service.BrokerServiceException$TopicFencedException: Topic is temporarily unavailable
    ```
    
    After check the heap dump of the broker, the `pendingWriteOps` is 5, this is the reason why the topic can not recover from the fenced state.
    
    The topic will change to unfenced only the `pendingWriteOps` is 0, details can find at [PersistentTopic.decrementPendingWriteOpsAndCheck()](https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L463)
    
    But after checking the ML state of the topic, it shows the `pendingAddEntries` is 0 which not equals to `pendingWriteOps` of the topic.
    The root cause is we are polling add entry op from the `pendingAddEntries` in multiple threads, one is the the ZK callback thread when complete the ledger creating (https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1406, https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedL [...]
    another one is the ML worker thread when complete the add entry op (https://github.com/apache/pulsar/blob/794aa20d9f2a4e668cc36465362d22e042e6e536/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L181)
    
    After the ongoing add entry op complete, but the corresponding op might been polled by the `clearPendingAddEntries` method. So it will poll another one, but due to
    not equals to the current op, the polled op will not get a chance to be failed, so that the `pendingWriteOps` will not change to 0.
    
    I have attached the complete logs for the topic:
    
    The fix is to complete the add entry op with ManagedLedgerException if the polled op is not equals to the current op.
    
    * Release buffer.
    
    * Revert
    
    (cherry picked from commit 1bcbab04df4055714036afa3ce3bf6cf370869c9)
---
 .../main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c76d532..665b138 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -137,6 +137,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
             ReferenceCountUtil.release(data);
             cb.addFailed(e, ctx);
             ml.mbean.recordAddEntryError();
+            this.recycle();
         }
     }
 
@@ -179,7 +180,13 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba
     public void safeRun() {
         // Remove this entry from the head of the pending queue
         OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
-        checkArgument(this == firstInQueue);
+        if (firstInQueue == null) {
+            return;
+        }
+        if (this != firstInQueue) {
+            firstInQueue.failed(new ManagedLedgerException("Unexpected add entry op when complete the add entry op."));
+            return;
+        }
 
         ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
         ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);