You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ja...@apache.org on 2018/05/20 23:21:53 UTC

[incubator-pulsar] branch branch-1.22 updated: Fixed mem leak when acknowledging while disconnected from broker (#1817)

This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch branch-1.22
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-1.22 by this push:
     new b3d5256  Fixed mem leak when acknowledging while disconnected from broker (#1817)
b3d5256 is described below

commit b3d52562430da6bbeb7c0ab48edbe202a75b1099
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun May 20 16:21:49 2018 -0700

    Fixed mem leak when acknowledging while disconnected from broker (#1817)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b74bb13..4629aca 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -479,13 +479,13 @@ public class ConsumerImpl extends ConsumerBase {
     private CompletableFuture<Void> sendAcknowledge(MessageId messageId, AckType ackType,
                                                     Map<String,Long> properties) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
-        final ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(),
-                                            ackType, null, properties);
 
         // There's no actual response from ack messages
         final CompletableFuture<Void> ackFuture = new CompletableFuture<Void>();
 
         if (isConnected()) {
+            final ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(),
+                    ackType, null, properties);
             cnx().ctx().writeAndFlush(cmd).addListener(new GenericFutureListener<Future<Void>>() {
                 @Override
                 public void operationComplete(Future<Void> future) throws Exception {

-- 
To stop receiving notification emails like this one, please contact
jai1@apache.org.