You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/04 08:21:08 UTC

[GitHub] merlimat closed pull request #1006: Acknowledgement with properties for RawReader

merlimat closed pull request #1006: Acknowledgement with properties for RawReader
URL: https://github.com/apache/incubator-pulsar/pull/1006
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index db52b75db..0eaec7010 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -21,11 +21,14 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -251,7 +254,7 @@ void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sent
                 iter.remove();
                 PositionImpl pos = (PositionImpl) entry.getPosition();
                 entry.release();
-                subscription.acknowledgeMessage(pos, AckType.Individual);
+                subscription.acknowledgeMessage(pos, AckType.Individual, Collections.emptyMap());
                 continue;
             }
             if (pendingAcks != null) {
@@ -334,15 +337,21 @@ void messageAcked(CommandAck ack) {
                     position, ack.getValidationError());
         }
 
+        Map<String,Long> properties = Collections.emptyMap();
+        if (ack.getPropertiesCount() > 0) {
+            properties = ack.getPropertiesList().stream()
+                .collect(Collectors.toMap((e) -> e.getKey(),
+                                          (e) -> e.getValue()));
+        }
         if (subType == SubType.Shared) {
             // On shared subscriptions, cumulative ack is not supported
             checkArgument(ack.getAckType() == AckType.Individual);
 
             // Only ack a single message
             removePendingAcks(position);
-            subscription.acknowledgeMessage(position, AckType.Individual);
+            subscription.acknowledgeMessage(position, AckType.Individual, properties);
         } else {
-            subscription.acknowledgeMessage(position, ack.getAckType());
+            subscription.acknowledgeMessage(position, ack.getAckType(), properties);
         }
 
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 6f728ae03..f50229a1b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.bookkeeper.mledger.Entry;
@@ -39,7 +40,7 @@
 
     void consumerFlow(Consumer consumer, int additionalNumberOfMessages);
 
-    void acknowledgeMessage(PositionImpl position, AckType ackType);
+    void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties);
 
     String getDestination();
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index c58159b6c..e2c074565 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.nonpersistent;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -138,7 +139,7 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
     }
 
     @Override
-    public void acknowledgeMessage(PositionImpl position, AckType ackType) {
+    public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
         // No-op
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c58fd89e9..823daa24d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -175,12 +176,12 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
     }
 
     @Override
-    public void acknowledgeMessage(PositionImpl position, AckType ackType) {
+    public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
         if (ackType == AckType.Cumulative) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
             }
-            cursor.asyncMarkDelete(position, markDeleteCallback, position);
+            cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual ack on {}", topicName, subName, position);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index 5883a611e..b5a4a6575 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.api;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -30,9 +31,9 @@
     /**
      * Create a raw reader for a topic.
      */
-    public static CompletableFuture<RawReader> create(PulsarClient client, String topic) {
+    public static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        RawReader r = new RawReaderImpl((PulsarClientImpl)client, topic, future);
+        RawReader r = new RawReaderImpl((PulsarClientImpl)client, topic, subscription, future);
         return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
     }
 
@@ -49,6 +50,16 @@
      */
     CompletableFuture<RawMessage> readNextAsync();
 
+    /**
+     * Acknowledge all messages as read up until <i>messageId</i>. The properties are stored
+     * with the individual acknowledgement, so later acknowledgements will overwrite all
+     * properties from previous acknowledgements.
+     *
+     * @param messageId to cumulatively acknowledge to
+     * @param properties a map of properties which will be stored with the acknowledgement
+     */
+    CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties);
+
     /**
      * Close the raw reader.
      */
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index c861fdd00..8057220ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -20,6 +20,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
@@ -38,6 +39,7 @@
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 import io.netty.buffer.ByteBuf;
 import org.slf4j.Logger;
@@ -52,12 +54,12 @@
     private final ConsumerConfiguration consumerConfiguration;
     private RawConsumerImpl consumer;
 
-    public RawReaderImpl(PulsarClientImpl client, String topic, CompletableFuture<Consumer> consumerFuture) {
+    public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
+                         CompletableFuture<Consumer> consumerFuture) {
         this.client = client;
+        this.subscription = subscription;
         this.topic = topic;
 
-        subscription = "raw-reader";
-
         consumerConfiguration = new ConsumerConfiguration();
         consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
         consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
@@ -76,6 +78,11 @@ public RawReaderImpl(PulsarClientImpl client, String topic, CompletableFuture<Co
         return consumer.receiveRawAsync();
     }
 
+    @Override
+    public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Map<String,Long> properties) {
+        return consumer.doAcknowledge(messageId, AckType.Cumulative, properties);
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         return consumer.closeAsync();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 269aaf50d..11d69f187 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -36,6 +36,7 @@
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -1168,7 +1169,7 @@ public void testAckCommand() throws Exception {
         PositionImpl pos = new PositionImpl(0, 0);
 
         clientCommand = Commands.newAck(1 /* consumer id */, pos.getLedgerId(), pos.getEntryId(), AckType.Individual,
-                null);
+                                        null, Collections.emptyMap());
         channel.writeInbound(clientCommand);
 
         // verify nothing is sent out on the wire after ack
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 001d7e04c..a2e6ccabb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -26,13 +26,19 @@
 import io.netty.buffer.ByteBuf;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.bookkeeper.mledger.ManagedLedger;
+
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
@@ -41,7 +47,6 @@
 import org.apache.pulsar.client.api.RawMessage;
 import org.apache.pulsar.client.api.RawReader;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 
@@ -54,6 +59,7 @@
 
 public class RawReaderTest extends MockedPulsarServiceBaseTest {
     private static final Logger log = LoggerFactory.getLogger(RawReaderTest.class);
+    private static final String subscription = "foobar-sub";
 
     @BeforeMethod
     @Override
@@ -104,7 +110,7 @@ public void testRawReader() throws Exception {
 
         Set<String> keys = publishMessages(topic, numKeys);
 
-        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         try {
             while (true) { // should break out with TimeoutException
                 try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
@@ -126,7 +132,7 @@ public void testSeekToStart() throws Exception {
         publishMessages(topic, numKeys);
 
         Set<String> readKeys = new HashSet<>();
-        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         try {
             while (true) { // should break out with TimeoutException
                 try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
@@ -161,7 +167,7 @@ public void testSeekToMiddle() throws Exception {
         publishMessages(topic, numKeys);
 
         Set<String> readKeys = new HashSet<>();
-        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         int i = 0;
         MessageId seekTo = null;
         try {
@@ -206,7 +212,7 @@ public void testFlowControl() throws Exception {
 
         publishMessages(topic, numMessages);
 
-        RawReader reader = RawReader.create(pulsarClient, topic).get();
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
         List<Future<RawMessage>> futures = new ArrayList<>();
         Set<String> keys = new HashSet<>();
 
@@ -226,4 +232,44 @@ public void testFlowControl() throws Exception {
         Assert.assertEquals(timeouts, 1);
         Assert.assertEquals(keys.size(), numMessages);
     }
+
+    @Test
+    public void testAcknowledgeWithProperties() throws Exception {
+        int numKeys = 10;
+
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        Set<String> keys = publishMessages(topic, numKeys);
+
+        MessageId lastMessageId = null;
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        try {
+            while (true) { // should break out with TimeoutException
+                try (RawMessage m = reader.readNextAsync().get(1, TimeUnit.SECONDS)) {
+                    lastMessageId = m.getMessageId();
+                    Assert.assertTrue(keys.remove(extractKey(m)));
+                }
+            }
+        } catch (TimeoutException te) {
+            // ok
+        }
+
+        Assert.assertTrue(keys.isEmpty());
+
+        Map<String,Long> properties = new HashMap<>();
+        properties.put("foobar", 0xdeadbeefdecaL);
+        reader.acknowledgeCumulativeAsync(lastMessageId, properties).get(5, TimeUnit.SECONDS);
+
+        PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic);
+        ManagedLedger ledger = topicRef.getManagedLedger();
+        for (int i = 0; i < 30; i++) {
+            if (ledger.openCursor(subscription).getProperties().get("foobar") == Long.valueOf(0xdeadbeefdecaL)) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+        Assert.assertEquals(ledger.openCursor(subscription).getProperties().get("foobar"),
+                Long.valueOf(0xdeadbeefdecaL));
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 96e2a84a7..6ef58b26e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.impl;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
@@ -225,7 +227,7 @@ public void acknowledgeCumulative(MessageId messageId) throws PulsarClientExcept
 
     @Override
     public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
-        return doAcknowledge(messageId, AckType.Individual);
+        return doAcknowledge(messageId, AckType.Individual, Collections.emptyMap());
     }
 
     @Override
@@ -235,10 +237,11 @@ public void acknowledgeCumulative(MessageId messageId) throws PulsarClientExcept
                     "Cannot use cumulative acks on a non-exclusive subscription"));
         }
 
-        return doAcknowledge(messageId, AckType.Cumulative);
+        return doAcknowledge(messageId, AckType.Cumulative, Collections.emptyMap());
     }
 
-    abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType);
+    abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
+                                                             Map<String,Long> properties);
 
     @Override
     public void unsubscribe() throws PulsarClientException {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index c47c17fb4..788fc719b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -28,7 +28,9 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Objects;
 import java.util.Set;
@@ -322,7 +324,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
 
     // we may not be able to ack message being acked by client. However messages in prior
     // batch may be ackable
-    private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message) {
+    private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, MessageIdImpl message,
+                                           Map<String,Long> properties) {
         // get entry before this message and ack that message on broker
         MessageIdImpl lowerKey = batchMessageAckTracker.lowerKey(message);
         if (lowerKey != null) {
@@ -334,7 +337,7 @@ private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, Messag
                 log.debug("[{}] [{}] ack prior message {} to broker on cumulative ack for message {}", subscription,
                         consumerId, lowerKey, batchMessageId);
             }
-            sendAcknowledge(lowerKey, AckType.Cumulative);
+            sendAcknowledge(lowerKey, AckType.Cumulative, properties);
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] no messages prior to message {}", subscription, consumerId, batchMessageId);
@@ -342,7 +345,8 @@ private void ackMessagesInEarlierBatch(BatchMessageIdImpl batchMessageId, Messag
         }
     }
 
-    boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackType) {
+    boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackType,
+                                   Map<String,Long> properties) {
         // we keep track of entire batch and so need MessageIdImpl and cannot use BatchMessageIdImpl
         MessageIdImpl message = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
                 batchMessageId.getPartitionIndex());
@@ -396,7 +400,7 @@ boolean markAckForBatchMessage(BatchMessageIdImpl batchMessageId, AckType ackTyp
         } else {
             // we cannot ack this message to broker. but prior message may be ackable
             if (ackType == AckType.Cumulative) {
-                ackMessagesInEarlierBatch(batchMessageId, message);
+                ackMessagesInEarlierBatch(batchMessageId, message, properties);
             }
             if (log.isDebugEnabled()) {
                 log.debug("[{}] [{}] cannot ack message to broker {}, acktype {}, pending acks - {}", subscription,
@@ -439,7 +443,8 @@ public boolean isBatchingAckTrackerEmpty() {
     }
 
     @Override
-    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType) {
+    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
+                                                    Map<String,Long> properties) {
         checkArgument(messageId instanceof MessageIdImpl);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
@@ -447,7 +452,7 @@ public boolean isBatchingAckTrackerEmpty() {
         }
 
         if (messageId instanceof BatchMessageIdImpl) {
-            if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType)) {
+            if (markAckForBatchMessage((BatchMessageIdImpl) messageId, ackType, properties)) {
                 // all messages in batch have been acked so broker can be acked via sendAcknowledge()
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] [{}] acknowledging message - {}, acktype {}", subscription, consumerName, messageId,
@@ -463,12 +468,14 @@ public boolean isBatchingAckTrackerEmpty() {
         if (ackType == AckType.Cumulative && !(messageId instanceof BatchMessageIdImpl)) {
             updateBatchAckTracker((MessageIdImpl) messageId, ackType);
         }
-        return sendAcknowledge(messageId, ackType);
+        return sendAcknowledge(messageId, ackType, properties);
     }
 
-    private CompletableFuture<Void> sendAcknowledge(MessageId messageId, AckType ackType) {
+    private CompletableFuture<Void> sendAcknowledge(MessageId messageId, AckType ackType,
+                                                    Map<String,Long> properties) {
         MessageIdImpl msgId = (MessageIdImpl) messageId;
-        final ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), ackType, null);
+        final ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(),
+                                            ackType, null, properties);
 
         // There's no actual response from ack messages
         final CompletableFuture<Void> ackFuture = new CompletableFuture<Void>();
@@ -1094,7 +1101,7 @@ private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentC
 
     private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError) {
         ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), AckType.Individual,
-                validationError);
+                                      validationError, Collections.emptyMap());
         currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
         increaseAvailablePermits(currentCnx);
         stats.incrementNumReceiveFailed();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index bc80b9089..2d7830e2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -236,7 +236,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
     }
 
     @Override
-    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType) {
+    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
+                                                    Map<String,Long> properties) {
         checkArgument(messageId instanceof MessageIdImpl);
 
         if (getState() != State.Ready) {
@@ -249,7 +250,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
         } else {
 
             ConsumerImpl consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex());
-            return consumer.doAcknowledge(messageId, ackType).thenRun(() ->
+            return consumer.doAcknowledge(messageId, ackType, properties).thenRun(() ->
                     unAckedMessageTracker.remove((MessageIdImpl) messageId));
         }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 1ea16d7fb..4a5211d06 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -23,6 +23,7 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
@@ -512,7 +513,7 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg,
     }
 
     public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckType ackType,
-            ValidationError validationError) {
+                                 ValidationError validationError, Map<String,Long> properties) {
         CommandAck.Builder ackBuilder = CommandAck.newBuilder();
         ackBuilder.setConsumerId(consumerId);
         ackBuilder.setAckType(ackType);
@@ -524,6 +525,10 @@ public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, AckTy
         if (validationError != null) {
             ackBuilder.setValidationError(validationError);
         }
+        for (Map.Entry<String,Long> e : properties.entrySet()) {
+            ackBuilder.addProperties(
+                    PulsarApi.KeyLongValue.newBuilder().setKey(e.getKey()).setValue(e.getValue()).build());
+        }
         CommandAck ack = ackBuilder.build();
 
         ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.ACK).setAck(ack));
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index a3ae02ac6..ee2fe84d0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -1225,6 +1225,438 @@ void setValue(com.google.protobuf.ByteString value) {
     // @@protoc_insertion_point(class_scope:pulsar.proto.KeyValue)
   }
   
+  public interface KeyLongValueOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required string key = 1;
+    boolean hasKey();
+    String getKey();
+    
+    // required uint64 value = 2;
+    boolean hasValue();
+    long getValue();
+  }
+  public static final class KeyLongValue extends
+      com.google.protobuf.GeneratedMessageLite
+      implements KeyLongValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use KeyLongValue.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<KeyLongValue> handle;
+    private KeyLongValue(io.netty.util.Recycler.Handle<KeyLongValue> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<KeyLongValue> RECYCLER = new io.netty.util.Recycler<KeyLongValue>() {
+            protected KeyLongValue newObject(Handle<KeyLongValue> handle) {
+              return new KeyLongValue(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private KeyLongValue(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final KeyLongValue defaultInstance;
+    public static KeyLongValue getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public KeyLongValue getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required string key = 1;
+    public static final int KEY_FIELD_NUMBER = 1;
+    private java.lang.Object key_;
+    public boolean hasKey() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getKey() {
+      java.lang.Object ref = key_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          key_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getKeyBytes() {
+      java.lang.Object ref = key_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        key_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required uint64 value = 2;
+    public static final int VALUE_FIELD_NUMBER = 2;
+    private long value_;
+    public boolean hasValue() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getValue() {
+      return value_;
+    }
+    
+    private void initFields() {
+      key_ = "";
+      value_ = 0L;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasKey()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasValue()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getKeyBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeUInt64(2, value_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getKeyBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(2, value_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValueOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        key_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        value_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue result = org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.key_ = key_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.value_ = value_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.getDefaultInstance()) return this;
+        if (other.hasKey()) {
+          setKey(other.getKey());
+        }
+        if (other.hasValue()) {
+          setValue(other.getValue());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasKey()) {
+          
+          return false;
+        }
+        if (!hasValue()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              key_ = input.readBytes();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              value_ = input.readUInt64();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required string key = 1;
+      private java.lang.Object key_ = "";
+      public boolean hasKey() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getKey() {
+        java.lang.Object ref = key_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          key_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setKey(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        key_ = value;
+        
+        return this;
+      }
+      public Builder clearKey() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        key_ = getDefaultInstance().getKey();
+        
+        return this;
+      }
+      void setKey(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        key_ = value;
+        
+      }
+      
+      // required uint64 value = 2;
+      private long value_ ;
+      public boolean hasValue() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getValue() {
+        return value_;
+      }
+      public Builder setValue(long value) {
+        bitField0_ |= 0x00000002;
+        value_ = value;
+        
+        return this;
+      }
+      public Builder clearValue() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        value_ = 0L;
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.KeyLongValue)
+    }
+    
+    static {
+      defaultInstance = new KeyLongValue(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.KeyLongValue)
+  }
+  
   public interface EncryptionKeysOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -11198,6 +11630,12 @@ public Builder clearMessageId() {
     // optional .pulsar.proto.CommandAck.ValidationError validation_error = 4;
     boolean hasValidationError();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError getValidationError();
+    
+    // repeated .pulsar.proto.KeyLongValue properties = 5;
+    java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue> 
+        getPropertiesList();
+    org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue getProperties(int index);
+    int getPropertiesCount();
   }
   public static final class CommandAck extends
       com.google.protobuf.GeneratedMessageLite
@@ -11367,11 +11805,33 @@ public boolean hasValidationError() {
       return validationError_;
     }
     
+    // repeated .pulsar.proto.KeyLongValue properties = 5;
+    public static final int PROPERTIES_FIELD_NUMBER = 5;
+    private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue> properties_;
+    public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue> getPropertiesList() {
+      return properties_;
+    }
+    public java.util.List<? extends org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValueOrBuilder> 
+        getPropertiesOrBuilderList() {
+      return properties_;
+    }
+    public int getPropertiesCount() {
+      return properties_.size();
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue getProperties(int index) {
+      return properties_.get(index);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValueOrBuilder getPropertiesOrBuilder(
+        int index) {
+      return properties_.get(index);
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       ackType_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType.Individual;
       messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
       validationError_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError.UncompressedSizeCorruption;
+      properties_ = java.util.Collections.emptyList();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -11394,6 +11854,12 @@ public final boolean isInitialized() {
         memoizedIsInitialized = 0;
         return false;
       }
+      for (int i = 0; i < getPropertiesCount(); i++) {
+        if (!getProperties(i).isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -11418,6 +11884,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeEnum(4, validationError_.getNumber());
       }
+      for (int i = 0; i < properties_.size(); i++) {
+        output.writeMessage(5, properties_.get(i));
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -11442,6 +11911,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeEnumSize(4, validationError_.getNumber());
       }
+      for (int i = 0; i < properties_.size(); i++) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(5, properties_.get(i));
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -11563,6 +12036,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000004);
         validationError_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError.UncompressedSizeCorruption;
         bitField0_ = (bitField0_ & ~0x00000008);
+        properties_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
       
@@ -11612,6 +12087,11 @@ public Builder clone() {
           to_bitField0_ |= 0x00000008;
         }
         result.validationError_ = validationError_;
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+          properties_ = java.util.Collections.unmodifiableList(properties_);
+          bitField0_ = (bitField0_ & ~0x00000010);
+        }
+        result.properties_ = properties_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -11630,6 +12110,16 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandAck
         if (other.hasValidationError()) {
           setValidationError(other.getValidationError());
         }
+        if (!other.properties_.isEmpty()) {
+          if (properties_.isEmpty()) {
+            properties_ = other.properties_;
+            bitField0_ = (bitField0_ & ~0x00000010);
+          } else {
+            ensurePropertiesIsMutable();
+            properties_.addAll(other.properties_);
+          }
+          
+        }
         return this;
       }
       
@@ -11650,6 +12140,12 @@ public final boolean isInitialized() {
           
           return false;
         }
+        for (int i = 0; i < getPropertiesCount(); i++) {
+          if (!getProperties(i).isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -11708,6 +12204,12 @@ public Builder mergeFrom(
               }
               break;
             }
+            case 42: {
+              org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.newBuilder();
+              input.readMessage(subBuilder, extensionRegistry);
+              addProperties(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -11826,6 +12328,95 @@ public Builder clearValidationError() {
         return this;
       }
       
+      // repeated .pulsar.proto.KeyLongValue properties = 5;
+      private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue> properties_ =
+        java.util.Collections.emptyList();
+      private void ensurePropertiesIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          properties_ = new java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue>(properties_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+      
+      public java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue> getPropertiesList() {
+        return java.util.Collections.unmodifiableList(properties_);
+      }
+      public int getPropertiesCount() {
+        return properties_.size();
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue getProperties(int index) {
+        return properties_.get(index);
+      }
+      public Builder setProperties(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropertiesIsMutable();
+        properties_.set(index, value);
+        
+        return this;
+      }
+      public Builder setProperties(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.Builder builderForValue) {
+        ensurePropertiesIsMutable();
+        properties_.set(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropertiesIsMutable();
+        properties_.add(value);
+        
+        return this;
+      }
+      public Builder addProperties(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        ensurePropertiesIsMutable();
+        properties_.add(index, value);
+        
+        return this;
+      }
+      public Builder addProperties(
+          org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.Builder builderForValue) {
+        ensurePropertiesIsMutable();
+        properties_.add(builderForValue.build());
+        
+        return this;
+      }
+      public Builder addProperties(
+          int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue.Builder builderForValue) {
+        ensurePropertiesIsMutable();
+        properties_.add(index, builderForValue.build());
+        
+        return this;
+      }
+      public Builder addAllProperties(
+          java.lang.Iterable<? extends org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue> values) {
+        ensurePropertiesIsMutable();
+        super.addAll(values, properties_);
+        
+        return this;
+      }
+      public Builder clearProperties() {
+        properties_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000010);
+        
+        return this;
+      }
+      public Builder removeProperties(int index) {
+        ensurePropertiesIsMutable();
+        properties_.remove(index);
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandAck)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 5bc2f39fe..0ec525435 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -34,6 +34,11 @@ message KeyValue {
 	required string value = 2;
 }
 
+message KeyLongValue {
+        required string key = 1;
+        required uint64 value = 2;
+}
+
 message EncryptionKeys {
 	required string key = 1;
 	required bytes value = 2;
@@ -278,6 +283,7 @@ message CommandAck {
 	}
 
 	optional ValidationError validation_error = 4;
+        repeated KeyLongValue properties = 5;
 }
 
 message CommandFlow {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services