You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/09/25 11:16:44 UTC

[pulsar] branch master updated: [tests] Issue 2639: NullPointerException at closing consumer (#2640)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a4d1624  [tests] Issue 2639: NullPointerException at closing consumer (#2640)
a4d1624 is described below

commit a4d162448d63b4eea8edcdde0797dce877bba2f2
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Tue Sep 25 04:16:39 2018 -0700

    [tests] Issue 2639: NullPointerException at closing consumer (#2640)
    
    * [tests] Issue 2639: NullPointerException at closing consumer
    
    *Motivation*
    
    Connection can be null when closing consumer.
    
    *Changes*
    
    Check if the connection is null or not during closing.
    
    * Address comments
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 44 +++++++++++++---------
 1 file changed, 26 insertions(+), 18 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f1db613..826d977 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -689,32 +689,40 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         acknowledgmentsGroupingTracker.close();
 
         long requestId = client.newRequestId();
-        ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId);
 
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
         ClientCnx cnx = cnx();
-        cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
-            cnx.removeConsumer(consumerId);
-            if (exception == null || !cnx.ctx().channel().isActive()) {
-                log.info("[{}] [{}] Closed consumer", topic, subscription);
-                setState(State.Closed);
-                unAckedMessageTracker.close();
-                if (possibleSendToDeadLetterTopicMessages != null) {
-                    possibleSendToDeadLetterTopicMessages.clear();
+        if (null == cnx) {
+            cleanupAtClose(closeFuture);
+        } else {
+            ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId);
+            cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
+                cnx.removeConsumer(consumerId);
+                if (exception == null || !cnx.ctx().channel().isActive()) {
+                    cleanupAtClose(closeFuture);
+                } else {
+                    closeFuture.completeExceptionally(exception);
                 }
-                closeFuture.complete(null);
-                client.cleanupConsumer(this);
-                // fail all pending-receive futures to notify application
-                failPendingReceive();
-            } else {
-                closeFuture.completeExceptionally(exception);
-            }
-            return null;
-        });
+                return null;
+            });
+        }
 
         return closeFuture;
     }
 
+    private void cleanupAtClose(CompletableFuture<Void> closeFuture) {
+        log.info("[{}] [{}] Closed consumer", topic, subscription);
+        setState(State.Closed);
+        unAckedMessageTracker.close();
+        if (possibleSendToDeadLetterTopicMessages != null) {
+            possibleSendToDeadLetterTopicMessages.clear();
+        }
+        closeFuture.complete(null);
+        client.cleanupConsumer(this);
+        // fail all pending-receive futures to notify application
+        failPendingReceive();
+    }
+
     private void failPendingReceive() {
         lock.readLock().lock();
         try {