You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2020/12/16 22:35:50 UTC

[pulsar] branch master updated: [pulsar-client] Handle NPE while receiving ack for closed producer (#8979)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d41036  [pulsar-client] Handle NPE while receiving ack for closed producer (#8979)
1d41036 is described below

commit 1d410365860c342a09c2aacad9b2f78e737a5dea
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Wed Dec 16 14:35:18 2020 -0800

    [pulsar-client] Handle NPE while receiving ack for closed producer (#8979)
---
 .../src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 212292e..3c66065 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -376,7 +376,15 @@ public class ClientCnx extends PulsarHandler {
                     ledgerId, entryId);
         }
 
-        producers.get(producerId).ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId);
+        ProducerImpl<?> producer = producers.get(producerId);
+        if (producer != null) {
+            producer.ackReceived(this, sequenceId, highestSequenceId, ledgerId, entryId);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Producer is {} already closed, ignore published message [{}-{}]", producerId, ledgerId,
+                        entryId);
+            }
+        }
     }
 
     @Override