You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 12:23:38 UTC

[pulsar] 06/10: [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive (#11691)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 19c152b794d2533a2883727cbd0d7ebd2f15b0a8
Author: Jack Vanlightly <va...@gmail.com>
AuthorDate: Tue Aug 24 01:09:41 2021 +0200

    [Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive (#11691)
    
    * Fixed block forever bug in Consumer.batchReceive
    
    Ensure that all poll() calls to pendingBatchReceives
    is done within the pinnedInternalExecutor to avoid a
    race condition where a peek and a subsequent poll get
    different pending receives.
    
    Moved the pinnedInternalExecutor into the ConsumerBase
    as both ConsumerImpl and MultiTopicsConsumerImpl require it.
    
    failingPendingReceive() now always submits its work to the
    internal executor returning a CompletableFuture and all callers
    treat it as an asynchronous operation.
    
    * Fix broken MultiTopicsConsumerImplTest
    
    Needed a real executor service to run the
    failPendingReceive() method.
    
    * Ensure all calls to messageReceived happen on internal executor
    
    * Readd missing return statement in ConsumerImpl.closeAsync()
    
    * Ensure correct usage of consumer internal executors
    
    Ensure that the externalPinnedExecutor is only called for user
    code and internalPinnedExecutor used for internal tasks.
    Some test refactoring to manage creation of executors.
    
    (cherry picked from commit bd942e1730780f89b78799ca768fc68277f7965b)
---
 .../apache/pulsar/client/impl/ConsumerBase.java    | 163 ++++++++++++---------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  55 +++----
 .../client/impl/MultiTopicsConsumerImpl.java       |  61 ++++----
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |   2 +-
 .../pulsar/client/impl/ClientTestFixtures.java     |  16 +-
 .../pulsar/client/impl/ConsumerImplTest.java       |  17 +--
 .../client/impl/MultiTopicsConsumerImplTest.java   |  53 +++++--
 .../apache/pulsar/client/impl/ReaderImplTest.java  |  42 +++---
 8 files changed, 230 insertions(+), 179 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 05cfc9d..07cc3f0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -19,9 +19,8 @@
 package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.collect.Queues;
-
+import io.netty.util.Timeout;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
@@ -75,7 +74,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final MessageListener<T> listener;
     protected final ConsumerEventListener consumerEventListener;
     protected final ExecutorProvider executorProvider;
-    protected final ScheduledExecutorService pinnedExecutor;
+    protected final ScheduledExecutorService externalPinnedExecutor;
+    protected final ScheduledExecutorService internalPinnedExecutor;
     final BlockingQueue<Message<T>> incomingMessages;
     protected ConcurrentOpenHashMap<MessageIdImpl, MessageIdImpl[]> unAckedChunckedMessageIdSequenceMap;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
@@ -83,7 +83,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final Schema<T> schema;
     protected final ConsumerInterceptors<T> interceptors;
     protected final BatchReceivePolicy batchReceivePolicy;
-    protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
+    protected final ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
     private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ConsumerBase.class, "incomingMessagesSize");
     protected volatile long incomingMessagesSize = 0;
@@ -93,7 +93,8 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorProvider executorProvider,
-                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
+                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
+                           ConsumerInterceptors interceptors) {
         super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = conf.getSubscriptionName();
@@ -106,8 +107,10 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.incomingMessages = new GrowableArrayBlockingQueue<>();
         this.unAckedChunckedMessageIdSequenceMap = new ConcurrentOpenHashMap<>();
         this.executorProvider = executorProvider;
-        this.pinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
+        this.externalPinnedExecutor = (ScheduledExecutorService) executorProvider.getExecutor();
+        this.internalPinnedExecutor = (ScheduledExecutorService) client.getInternalExecutorService();
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
+        this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
         this.interceptors = interceptors;
         if (conf.getBatchReceivePolicy() != null) {
@@ -206,26 +209,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
-    protected CompletableFuture<Message<T>> peekPendingReceive() {
-        CompletableFuture<Message<T>> receivedFuture = null;
-        while (receivedFuture == null) {
-            receivedFuture = pendingReceives.peek();
-            if (receivedFuture == null) {
-                break;
-            }
-            // skip done futures (cancelling a future could mark it done)
-            if (receivedFuture.isDone()) {
-                CompletableFuture<Message<T>> removed = pendingReceives.poll();
-                if (removed != receivedFuture) {
-                    log.error("Bug! Removed future wasn't the expected one. expected={} removed={}", receivedFuture, removed);
-                }
-                receivedFuture = null;
-            }
-        }
-        return receivedFuture;
+    protected boolean hasNextPendingReceive() {
+        return !pendingReceives.isEmpty();
     }
 
-    protected CompletableFuture<Message<T>> pollPendingReceive() {
+    protected CompletableFuture<Message<T>> nextPendingReceive() {
         CompletableFuture<Message<T>> receivedFuture;
         while (true) {
             receivedFuture = pendingReceives.poll();
@@ -238,7 +226,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
-        getExecutor(message).execute(() -> {
+        getInternalExecutor(message).execute(() -> {
             if (!receivedFuture.complete(message)) {
                 log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was dropped. message={}",
                         receivedFuture.isCancelled(), message);
@@ -246,7 +234,28 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         });
     }
 
-    protected void failPendingReceives(ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives) {
+    protected CompletableFuture<Void> failPendingReceive() {
+        if (internalPinnedExecutor.isShutdown()) {
+            // we need to fail any pending receives no matter what,
+            // to avoid blocking user code
+            failPendingReceives();
+            failPendingBatchReceives();
+            return CompletableFuture.completedFuture(null);
+        } else {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                try {
+                    failPendingReceives();
+                    failPendingBatchReceives();
+                } finally {
+                    future.complete(null);
+                }
+            });
+            return future;
+        }
+    }
+
+    private void failPendingReceives() {
         while (!pendingReceives.isEmpty()) {
             CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
             if (receiveFuture == null) {
@@ -254,21 +263,23 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             }
             if (!receiveFuture.isDone()) {
                 receiveFuture.completeExceptionally(
-                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing the consumers", topic, subscription)));
             }
         }
     }
 
-    protected void failPendingBatchReceives(ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives) {
-        while (!pendingBatchReceives.isEmpty()) {
-            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.poll();
+    private void failPendingBatchReceives() {
+        while (hasNextBatchReceive()) {
+            OpBatchReceive<T> opBatchReceive = nextBatchReceive();
             if (opBatchReceive == null || opBatchReceive.future == null) {
                 break;
             }
             if (!opBatchReceive.future.isDone()) {
                 opBatchReceive.future.completeExceptionally(
-                        new PulsarClientException.AlreadyClosedException(String.format("The consumer which subscribes the topic %s with subscription name %s " +
+                        new PulsarClientException.AlreadyClosedException(
+                                String.format("The consumer which subscribes the topic %s with subscription name %s " +
                                 "was already closed when cleaning and closing the consumers", topic, subscription)));
             }
         }
@@ -733,7 +744,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected void notifyPendingBatchReceivedCallBack() {
-        OpBatchReceive<T> opBatchReceive = pollNextBatchReceive();
+        OpBatchReceive<T> opBatchReceive = nextBatchReceive();
         if (opBatchReceive == null) {
             return;
         }
@@ -745,31 +756,16 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         }
     }
 
-    private OpBatchReceive<T> peekNextBatchReceive() {
-        OpBatchReceive<T> opBatchReceive = null;
-        while (opBatchReceive == null) {
-            opBatchReceive = pendingBatchReceives.peek();
-            // no entry available
-            if (opBatchReceive == null) {
-                return null;
-            }
-            // remove entries where future is null or has been completed (cancel / timeout)
-            if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
-                OpBatchReceive<T> removed = pendingBatchReceives.poll();
-                if (removed != opBatchReceive) {
-                    log.error("Bug: Removed entry wasn't the expected one. expected={}, removed={}", opBatchReceive, removed);
-                }
-                opBatchReceive = null;
-            }
-        }
-        return opBatchReceive;
+    private boolean hasNextBatchReceive() {
+        return !pendingBatchReceives.isEmpty();
     }
 
 
-    private OpBatchReceive<T> pollNextBatchReceive() {
+    private OpBatchReceive<T> nextBatchReceive() {
         OpBatchReceive<T> opBatchReceive = null;
         while (opBatchReceive == null) {
             opBatchReceive = pendingBatchReceives.poll();
+
             // no entry available
             if (opBatchReceive == null) {
                 return null;
@@ -807,7 +803,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected abstract void messageProcessed(Message<?> msg);
 
 
-    private void pendingBatchReceiveTask(Timeout timeout) throws Exception {
+    private void pendingBatchReceiveTask(Timeout timeout) {
+        internalPinnedExecutor.execute(() -> doPendingBatchReceiveTask(timeout));
+    }
+
+    private void doPendingBatchReceiveTask(Timeout timeout) {
         if (timeout.isCancelled()) {
             return;
         }
@@ -819,32 +819,44 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
             if (getState() == State.Closing || getState() == State.Closed) {
                 return;
             }
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
-            OpBatchReceive<T> firstOpBatchReceive = peekNextBatchReceive();
+
             timeToWaitMs = batchReceivePolicy.getTimeoutMs();
+            OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();
 
-            while (firstOpBatchReceive != null) {
+            while (opBatchReceive != null) {
                 // If there is at least one batch receive, calculate the diff between the batch receive timeout
                 // and the elapsed time since the operation was created.
                 long diff = batchReceivePolicy.getTimeoutMs()
-                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt);
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt);
+
                 if (diff <= 0) {
-                    // The diff is less than or equal to zero, meaning that the batch receive has been timed out.
-                    // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives.
-                    OpBatchReceive<T> op = pollNextBatchReceive();
-                    if (op != null) {
-                        completeOpBatchReceive(op);
+                    completeOpBatchReceive(opBatchReceive);
+
+                    // remove the peeked item from the queue
+                    OpBatchReceive<T> removed = pendingBatchReceives.poll();
+
+                    if (removed != opBatchReceive) {
+                        // regression check, if this were to happen due to incorrect code changes in the future,
+                        // (allowing multi-threaded calls to poll()), then ensure that the polled item is completed
+                        // to avoid blocking user code
+
+                        log.error("Race condition in consumer {} (should not cause data loss). "
+                                + " Concurrent operations on pendingBatchReceives is not safe", this.consumerName);
+                        if (removed != null && !removed.future.isDone()) {
+                            completeOpBatchReceive(removed);
+                        }
                     }
-                    firstOpBatchReceive = peekNextBatchReceive();
+
                 } else {
                     // The diff is greater than zero, set the timeout to the diff value
                     timeToWaitMs = diff;
                     break;
                 }
+
+                opBatchReceive = pendingBatchReceives.peek();
             }
-            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask, timeToWaitMs, TimeUnit.MILLISECONDS);
+            batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
+                    timeToWaitMs, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -861,7 +873,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
                         executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
                                 callMessageListener(msg));
                     } else {
-                        getExecutor(msg).execute(() -> {
+                        getExternalExecutor(msg).execute(() -> {
                             callMessageListener(msg);
                         });
                     }
@@ -911,7 +923,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     }
 
     protected boolean hasPendingBatchReceive() {
-        return pendingBatchReceives != null && peekNextBatchReceive() != null;
+        return pendingBatchReceives != null && hasNextBatchReceive();
     }
 
     protected void increaseIncomingMessageSize(final Message<?> message) {
@@ -934,12 +946,21 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
 
     protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
 
-    private ExecutorService getExecutor(Message<T> msg) {
+    private ExecutorService getExternalExecutor(Message<T> msg) {
+        ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
+                : null;
+        ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null
+                ? receivedConsumer.externalPinnedExecutor
+                : externalPinnedExecutor;
+        return executor;
+    }
+
+    private ExecutorService getInternalExecutor(Message<T> msg) {
         ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
                 : null;
-        ExecutorService executor = receivedConsumer != null && receivedConsumer.pinnedExecutor != null
-                ? receivedConsumer.pinnedExecutor
-                : pinnedExecutor;
+        ExecutorService executor = receivedConsumer != null && receivedConsumer.internalPinnedExecutor != null
+                ? receivedConsumer.internalPinnedExecutor
+                : internalPinnedExecutor;
         return executor;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 64c0194..35c6618 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -26,7 +26,6 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Queues;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.util.Timeout;
@@ -190,7 +189,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     private final ConcurrentLongHashMap<OpForAckCallBack> ackRequests;
 
     private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
-    private final ExecutorService internalPinnedExecutor;
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
                                                String topic,
@@ -257,7 +255,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.pendingChunckedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
         this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
         this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
-        this.internalPinnedExecutor = client.getInternalExecutorService();
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -473,9 +470,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
 
         internalPinnedExecutor.execute(() -> {
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -821,7 +815,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
-            failPendingReceive();
             clearReceiverQueue();
             return;
         }
@@ -1022,9 +1015,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     @Override
     public CompletableFuture<Void> closeAsync() {
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
         if (getState() == State.Closing || getState() == State.Closed) {
             closeConsumerTasks();
-            return CompletableFuture.completedFuture(null);
+            failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
+            return closeFuture;
         }
 
         if (!isConnected()) {
@@ -1033,7 +1029,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             closeConsumerTasks();
             deregisterFromClientCnx();
             client.cleanupConsumer(this);
-            return CompletableFuture.completedFuture(null);
+            failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
+            return closeFuture;
         }
 
         stats.getStatTimeout().ifPresent(Timeout::cancel);
@@ -1044,7 +1041,6 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         long requestId = client.newRequestId();
 
-        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
         if (null == cnx) {
             cleanupAtClose(closeFuture, null);
@@ -1067,15 +1063,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         log.info("[{}] [{}] Closed consumer", topic, subscription);
         setState(State.Closed);
         closeConsumerTasks();
-        if (exception != null) {
-            closeFuture.completeExceptionally(exception);
-        } else {
-            closeFuture.complete(null);
-        }
         deregisterFromClientCnx();
         client.cleanupConsumer(this);
+
         // fail all pending-receive futures to notify application
-        failPendingReceive();
+        failPendingReceive().whenComplete((r, t) -> {
+            if (exception != null) {
+                closeFuture.completeExceptionally(exception);
+            } else {
+                closeFuture.complete(null);
+            }
+        });
     }
 
     private void closeConsumerTasks() {
@@ -1101,21 +1099,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         stats.getStatTimeout().ifPresent(Timeout::cancel);
     }
 
-    private void failPendingReceive() {
-        internalPinnedExecutor.execute(() -> {
-            if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-                failPendingReceives(this.pendingReceives);
-                failPendingBatchReceives(this.pendingBatchReceives);
-            }
-        });
-    }
-
     void activeConsumerChanged(boolean isActive) {
         if (consumerEventListener == null) {
             return;
         }
 
-        pinnedExecutor.execute(() -> {
+        externalPinnedExecutor.execute(() -> {
             if (isActive) {
                 consumerEventListener.becameActive(this, partitionIndex);
             } else {
@@ -1215,7 +1204,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
                             Collections.singletonList(message));
                 }
-                if (peekPendingReceive() != null) {
+                if (hasNextPendingReceive()) {
                     notifyPendingReceivedCallback(message, null);
                 } else if (enqueueMessageAndCheckBatchReceive(message)) {
                     if (hasPendingBatchReceive()) {
@@ -1250,7 +1239,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         // Lazy task scheduling to expire incomplete chunk message
         if (!expireChunkMessageTaskScheduled && expireTimeOfIncompleteChunkedMessageMillis > 0) {
-            pinnedExecutor.scheduleAtFixedRate(() -> {
+            internalPinnedExecutor.scheduleAtFixedRate(() -> {
                 removeExpireIncompleteChunkedMessages();
             }, expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
                     TimeUnit.MILLISECONDS);
@@ -1338,19 +1327,19 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
 
         // fetch receivedCallback from queue
-        final CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
+        final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture == null) {
             return;
         }
 
         if (exception != null) {
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
+            internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
             return;
         }
 
         if (message == null) {
             IllegalStateException e = new IllegalStateException("received message can't be null");
-            pinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
+            internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
             return;
         }
 
@@ -1442,7 +1431,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     possibleToDeadLetter.add(message);
                 }
                 internalPinnedExecutor.execute(() -> {
-                    if (peekPendingReceive() != null) {
+                    if (hasNextPendingReceive()) {
                         notifyPendingReceivedCallback(message, null);
                     } else if (enqueueMessageAndCheckBatchReceive(message)) {
                         if (hasPendingBatchReceive()) {
@@ -2208,7 +2197,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return;
             }
 
-            pinnedExecutor.schedule(() -> {
+            internalPinnedExecutor.schedule(() -> {
                 log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
                         topic, getHandlerName(), nextDelay);
                 remainingTime.addAndGet(-nextDelay);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 9eb3204..4255f1c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import org.apache.commons.lang3.tuple.Pair;
@@ -234,10 +233,11 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
                 topic, newConsumers.size(), getState());
         }
+
         if (getState() == State.Ready) {
             newConsumers.forEach(consumer -> {
                 consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
-                receiveMessageFromConsumer(consumer);
+                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
             });
         }
     }
@@ -260,7 +260,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             } else {
                 // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
                 // recursion and stack overflow
-                client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer));
+                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
             }
         });
     }
@@ -276,7 +276,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         }
 
         // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
-        CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
+        CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
         if (receivedFuture != null) {
             unAckedMessageTracker.add(topicMessage.getMessageId());
             completePendingReceive(receivedFuture, topicMessage);
@@ -303,7 +303,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                     break;
                 }
 
-                client.getInternalExecutorService().execute(() -> {
+                internalPinnedExecutor.execute(() -> {
                     receiveMessageFromConsumer(consumer);
                 });
             }
@@ -363,9 +363,6 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
         try {
             lock.writeLock().lock();
-            if (pendingBatchReceives == null) {
-                pendingBatchReceives = Queues.newConcurrentLinkedQueue();
-            }
             if (hasEnoughMessagesForBatchReceive()) {
                 MessagesImpl<T> messages = getNewMessagesImpl();
                 Message<T> msgPeeked = incomingMessages.peek();
@@ -516,13 +513,17 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
+            .thenCompose((r) -> {
+                setState(State.Closed);
+                cleanupMultiConsumer();
+                log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
+                        topic, subscription, consumerName);
+                // fail all pending-receive futures to notify application
+                return failPendingReceive();
+            })
             .whenComplete((r, ex) -> {
                 if (ex == null) {
-                    setState(State.Closed);
-                    unAckedMessageTracker.close();
                     unsubscribeFuture.complete(null);
-                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
-                        topic, subscription, consumerName);
                 } else {
                     setState(State.Failed);
                     unsubscribeFuture.completeExceptionally(ex);
@@ -552,15 +553,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             .map(c -> c.closeAsync()).collect(Collectors.toList());
 
         FutureUtil.waitForAll(futureList)
+            .thenCompose((r) -> {
+                setState(State.Closed);
+                cleanupMultiConsumer();
+                log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
+                // fail all pending-receive futures to notify application
+                return failPendingReceive();
+            })
             .whenComplete((r, ex) -> {
                 if (ex == null) {
-                    setState(State.Closed);
-                    unAckedMessageTracker.close();
                     closeFuture.complete(null);
-                    log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
-                    client.cleanupConsumer(this);
-                    // fail all pending-receive futures to notify application
-                    failPendingReceive();
                 } else {
                     setState(State.Failed);
                     closeFuture.completeExceptionally(ex);
@@ -572,11 +574,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         return closeFuture;
     }
 
-    private void failPendingReceive() {
-        if (pinnedExecutor != null && !pinnedExecutor.isShutdown()) {
-            failPendingReceives(pendingReceives);
-            failPendingBatchReceives(pendingBatchReceives);
-        }
+    private void cleanupMultiConsumer() {
+        unAckedMessageTracker.close();
+        client.cleanupConsumer(this);
     }
 
     @Override
@@ -972,14 +972,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 }
 
                 // We have successfully created new consumers, so we can start receiving messages for them
-                startReceivingMessages(
-                    consumers.values().stream()
-                        .filter(consumer1 -> {
-                            String consumerTopicName = consumer1.getTopic();
-                            return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
-                                    TopicName.get(topicName).getPartitionedTopicName());
-                        })
-                        .collect(Collectors.toList()));
+                startReceivingMessages(consumers.values().stream()
+                                .filter(consumer1 -> {
+                                    String consumerTopicName = consumer1.getTopic();
+                                    return TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+                                            TopicName.get(topicName).getPartitionedTopicName());
+                                })
+                                .collect(Collectors.toList()));
 
                 subscribeResult.complete(null);
                 log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 03d1315..0eac110 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -152,7 +152,7 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
         checkNotNull(listener, "listener can't be null");
         checkNotNull(message, "unqueued message can't be null");
 
-        pinnedExecutor.execute(() -> {
+        externalPinnedExecutor.execute(() -> {
             stats.updateNumMsgsReceived(message);
             try {
                 if (log.isDebugEnabled()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
index 0adb165..9e47e52 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientTestFixtures.java
@@ -39,21 +39,29 @@ import static org.mockito.Mockito.*;
 class ClientTestFixtures {
     public static ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor();
 
-    static <T> PulsarClientImpl createPulsarClientMock() {
+//    static <T> PulsarClientImpl createPulsarClientMock() {
+//        return createPulsarClientMock(mock(ExecutorService.class));
+//    }
+
+    static <T> PulsarClientImpl createPulsarClientMock(ExecutorProvider executorProvider,
+                                                       ExecutorService internalExecutorService) {
         PulsarClientImpl clientMock = mock(PulsarClientImpl.class, Mockito.RETURNS_DEEP_STUBS);
 
         ClientConfigurationData clientConf = new ClientConfigurationData();
         when(clientMock.getConfiguration()).thenReturn(clientConf);
         when(clientMock.timer()).thenReturn(mock(Timer.class));
 
-        when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
+        when(clientMock.getInternalExecutorService()).thenReturn(internalExecutorService);
+        when(clientMock.externalExecutorProvider()).thenReturn(executorProvider);
         when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
 
         return clientMock;
     }
 
-    static <T> PulsarClientImpl createPulsarClientMockWithMockedClientCnx() {
-        return mockClientCnx(createPulsarClientMock());
+    static <T> PulsarClientImpl createPulsarClientMockWithMockedClientCnx(
+            ExecutorProvider executorProvider,
+            ExecutorService internalExecutorService) {
+        return mockClientCnx(createPulsarClientMock(executorProvider, internalExecutorService));
     }
 
     static PulsarClientImpl mockClientCnx(PulsarClientImpl clientMock) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 0f32c41..6f0f83f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -49,19 +48,17 @@ import org.testng.annotations.Test;
 
 public class ConsumerImplTest {
 
-
     private ExecutorProvider executorProvider;
+    private ExecutorService internalExecutor;
     private ConsumerImpl<byte[]> consumer;
     private ConsumerConfigurationData consumerConf;
-    private ExecutorService executorService;
 
     @BeforeMethod
     public void setUp() {
         executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
+        internalExecutor = Executors.newSingleThreadScheduledExecutor();
         consumerConf = new ConsumerConfigurationData<>();
-        PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
-        executorService = Executors.newSingleThreadExecutor();
-        when(client.getInternalExecutorService()).thenReturn(executorService);
+        PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor);
         ClientConfigurationData clientConf = client.getConfiguration();
         clientConf.setOperationTimeoutMs(100);
         clientConf.setStatsIntervalSeconds(0);
@@ -81,9 +78,9 @@ public class ConsumerImplTest {
             executorProvider.shutdownNow();
             executorProvider = null;
         }
-        if (executorService != null) {
-            executorService.shutdownNow();
-            executorService = null;
+        if (internalExecutor != null) {
+            internalExecutor.shutdownNow();
+            internalExecutor = null;
         }
     }
 
@@ -173,7 +170,7 @@ public class ConsumerImplTest {
     public void testReceiveAsyncCanBeCancelled() {
         // given
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        Awaitility.await().untilAsserted(() -> Assert.assertEquals(consumer.peekPendingReceive(), future));
+        Awaitility.await().untilAsserted(() -> Assert.assertTrue(consumer.hasNextPendingReceive()));
         // when
         future.cancel(true);
         // then
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 953309e..d4845a3 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -23,6 +23,8 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -34,6 +36,10 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -59,6 +65,27 @@ import static org.testng.Assert.expectThrows;
  */
 public class MultiTopicsConsumerImplTest {
 
+    private ExecutorProvider executorProvider;
+    private ExecutorService internalExecutor;
+
+    @BeforeMethod(alwaysRun = true)
+    public void setUp() {
+        executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("MultiTopicsConsumerImplTest"));
+        internalExecutor = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanUp() {
+        if (executorProvider != null) {
+            executorProvider.shutdownNow();
+            executorProvider = null;
+        }
+        if (internalExecutor != null) {
+            internalExecutor.shutdownNow();
+            internalExecutor = null;
+        }
+    }
+
     @Test
     public void testGetStats() throws Exception {
         String topicName = "test-stats";
@@ -110,18 +137,23 @@ public class MultiTopicsConsumerImplTest {
     }
 
     private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer() {
-        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
         ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
+        return createMultiTopicsConsumer(consumerConfData);
+    }
+
+    private MultiTopicsConsumerImpl<byte[]> createMultiTopicsConsumer(
+            ConsumerConfigurationData<byte[]> consumerConfData) {
         int completionDelayMillis = 100;
         Schema<byte[]> schema = Schema.BYTES;
-        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createDelayedCompletedFuture(
                 new PartitionedTopicMetadata(), completionDelayMillis));
         when(clientMock.<byte[]>preProcessSchemaBeforeSubscribe(any(), any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(schema));
-        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
-            new CompletableFuture<>(), schema, null, true);
+        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(
+                clientMock, consumerConfData, executorProvider,
+                new CompletableFuture<>(), schema, null, true);
         return impl;
     }
 
@@ -130,7 +162,7 @@ public class MultiTopicsConsumerImplTest {
         // given
         MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
         CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
-        assertEquals(consumer.peekPendingReceive(), future);
+        assertTrue(consumer.hasNextPendingReceive());
         // when
         future.cancel(true);
         // then
@@ -150,19 +182,18 @@ public class MultiTopicsConsumerImplTest {
     }
 
     @Test
-    public void testConsumerCleanupOnSubscribeFailure() throws InterruptedException, TimeoutException, ExecutionException {
-        ExecutorProvider executorProvider = mock(ExecutorProvider.class);
+    public void testConsumerCleanupOnSubscribeFailure() {
         ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
         consumerConfData.setSubscriptionName("subscriptionName");
         consumerConfData.setTopicNames(new HashSet<>(Arrays.asList("a", "b", "c")));
         int completionDelayMillis = 10;
         Schema<byte[]> schema = Schema.BYTES;
-        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createExceptionFuture(
                 new PulsarClientException.InvalidConfigurationException("a mock exception"), completionDelayMillis));
         CompletableFuture<Consumer<byte[]>> completeFuture = new CompletableFuture<>();
-        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData, executorProvider,
-                completeFuture, schema, null, true);
+        MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<byte[]>(clientMock, consumerConfData,
+                executorProvider, completeFuture, schema, null, true);
         // assert that we don't start in closed, then we move to closed and get an exception
         // indicating that closeAsync was called
         assertEquals(impl.getState(), HandlerState.State.Uninitialized);
@@ -189,7 +220,7 @@ public class MultiTopicsConsumerImplTest {
         @Cleanup("stop")
         Timer timer = new HashedWheelTimer();
 
-        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx();
+        PulsarClientImpl clientMock = createPulsarClientMockWithMockedClientCnx(executorProvider, internalExecutor);
         when(clientMock.timer()).thenReturn(timer);
         when(clientMock.preProcessSchemaBeforeSubscribe(any(), any(), any()))
                 .thenReturn(CompletableFuture.completedFuture(null));
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
index d0c4023..155a75e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ReaderImplTest.java
@@ -18,34 +18,36 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import io.netty.util.concurrent.DefaultThreadFactory;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
 import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-
 public class ReaderImplTest {
-    ReaderImpl<byte[]> reader;
-    private ExecutorService executorService;
+    private ReaderImpl<byte[]> reader;
+    private ExecutorProvider executorProvider;
+    private ExecutorService internalExecutor;
 
     @BeforeMethod
     void setupReader() {
-        PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx();
+        executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ReaderImplTest"));
+        internalExecutor = Executors.newSingleThreadScheduledExecutor();
+        PulsarClientImpl mockedClient = ClientTestFixtures.createPulsarClientMockWithMockedClientCnx(
+                executorProvider, internalExecutor);
         ReaderConfigurationData<byte[]> readerConfiguration = new ReaderConfigurationData<>();
         readerConfiguration.setTopicName("topicName");
-        executorService = Executors.newSingleThreadExecutor();
-        when(mockedClient.getInternalExecutorService()).thenReturn(executorService);
         CompletableFuture<Consumer<byte[]>> consumerFuture = new CompletableFuture<>();
         reader = new ReaderImpl<>(mockedClient, readerConfiguration, ClientTestFixtures.createMockedExecutorProvider(),
                 consumerFuture, Schema.BYTES);
@@ -53,9 +55,13 @@ public class ReaderImplTest {
 
     @AfterMethod
     public void clean() {
-        if (executorService != null) {
-            executorService.shutdownNow();
-            executorService = null;
+        if (executorProvider != null) {
+            executorProvider.shutdownNow();
+            executorProvider = null;
+        }
+        if (internalExecutor != null) {
+            internalExecutor.shutdownNow();
+            internalExecutor = null;
         }
     }
 
@@ -64,13 +70,13 @@ public class ReaderImplTest {
         // given
         CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
         Awaitility.await().untilAsserted(() -> {
-            assertNotNull(reader.getConsumer().peekPendingReceive());
+            assertTrue(reader.getConsumer().hasNextPendingReceive());
         });
 
         // when
         future.cancel(false);
 
         // then
-        assertNull(reader.getConsumer().peekPendingReceive());
+        assertFalse(reader.getConsumer().hasNextPendingReceive());
     }
 }