You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/01 02:45:20 UTC

[pulsar] branch master updated: Fixed NPE when closing batch during a reconnection (#4427)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a5b772  Fixed NPE when closing batch during a reconnection (#4427)
2a5b772 is described below

commit 2a5b772e229692302c051f2de8928a007ad7713b
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 31 19:45:14 2019 -0700

    Fixed NPE when closing batch during a reconnection (#4427)
---
 .../src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index fecdc1f..7e7f865 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1322,11 +1322,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
                 pendingMessages.put(op);
 
+                ClientCnx cnx = cnx();
                 if (isConnected()) {
                     // If we do have a connection, the message is sent immediately, otherwise we'll try again once a new
                     // connection is established
                     cmd.retain();
-                    cnx().ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx(), op));
+                    cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                     stats.updateNumMsgsSent(numMessagesInBatch, op.batchSizeByte);
                 } else {
                     if (log.isDebugEnabled()) {