You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/11 05:44:08 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #8996: Ack response implementation

congbobo184 opened a new pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996


   ## Motivation
   in order to handle ack response implementation. When this PR commit, I will handle #8997.
   
   ## implement
   1. we implement a new PersistentAcknowledgmentsWithResponseGroupingTracker.
   2. we will add two ackRequests struct for async and sync flush.
   3. add a timer to handle timeout and the timeout task don't need to lock, because the timeout is sequential.
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-758380747


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-757636178


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 removed a comment on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 removed a comment on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-757602045


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-752294378


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557827167



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
##########
@@ -864,6 +864,32 @@ public TransactionConflictException(String msg) {
         }
     }
 
+    /**
+     * Consumer ack for response timeout.
+     */
+    public static class AckResponseTimeoutException extends PulsarClientException {

Review comment:
       No, it can be deleted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-758381688


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557844899



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId,
+                                                            BitSetRecyclable bitSet, AckType ackType,
+                                                            Map<String, Long> map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);

Review comment:
       because newImmediateAckAndFlush so we should write the command to broker immediately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r559544860



##########
File path: pulsar-common/src/main/proto/PulsarApi.proto
##########
@@ -255,6 +255,7 @@ enum ProtocolVersion {
               // Added Key_Shared subscription
     v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse
     v16 = 16; // Add support for raw message metadata
+    v17 = 17; // Added support ack response

Review comment:
       Please check all "ack response" related code and comments, Unify them to "ack receipt" 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +123,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
+                                                         AckType ackType, Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackReceiptEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackReceiptEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatesInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-                    doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
+                                                     Map<String, Long> properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = modifyBatchMessageIdAndStatesInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (batchMessageId.ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchIndexAck(batchMessageId, properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatesInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new ConcurrentHashMap<>());
-                    bitSet = transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl modifyBatchMessageIdAndStatesInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
+        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatesInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        clearMessageIdFromUnAckTrackerAndDeadLetter(messageId);
+    }
+
+    private void clearMessageIdFromUnAckTrackerAndDeadLetter(MessageIdImpl messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction subscription.
+            return doImmediateAck(messageId, AckType.Individual, properties, null);
+        } else {
+            if (ackReceiptEnabled) {

Review comment:
       It's better to use a method isAckReceiptEnabled() to check if the consumer enabled the ack receipt and the broker support the ack receipt.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-762076879


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-763383201


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-876889884


   I will work with @congbobo184 on the docs and track the issue here: https://github.com/apache/pulsar/issues/11272


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-761748755


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-876043198


   @lhotari hi, this pr only means server receive client ack request and then send response to client. `"Ack will return receipt but does not mean that the message will not be resent after get receipt." .` can add to the Pulsar documentation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-761916708


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r549275906



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1079,7 +1092,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         // 3. Consume messages: at Faster subscriber
         for (int i = 0; i < totalMsgs; i++) {
             msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
-            subscriber1.acknowledge(msg);
+            subscriber1.acknowledgeAsync(msg);

Review comment:
       do we have to wait for this operation to complete ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1802,7 +1816,7 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
                 if (msg != null) {
                     messages.add(msg);
                     totalReceiveMessages++;
-                    consumer1.acknowledge(msg);
+                    consumer1.acknowledgeAsync(msg);

Review comment:
       do we have to wait for this operation to complete ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1397,13 +1412,7 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
             assertEquals(messages.size(), unAckedMessagesBufferSize);
 
             // start acknowledging messages
-            messages.forEach(m -> {
-                try {
-                    consumer.acknowledge(m);
-                } catch (PulsarClientException e) {
-                    fail("ack failed", e);
-                }
-            });
+            messages.forEach(consumer::acknowledgeAsync);

Review comment:
       do we have to wait for this operation to complete and check the result ?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1098,7 +1111,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         // 6. consume messages : at slower subscriber
         for (int i = 0; i < totalMsgs; i++) {
             msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
-            subscriber2.acknowledge(msg);
+            subscriber2.acknowledgeAsync(msg);

Review comment:
       do we have to wait for this operation to complete ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-883220843


   @congbobo184 I recently came across #7683 . Does this PR somehow help resolve that issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zymap commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
zymap commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r545742033



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/AcknowledgmentsGroupingTracker.java
##########
@@ -31,12 +34,27 @@
 
     boolean isDuplicate(MessageId messageId);
 
-    void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties, TransactionImpl txn);
+    default CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,

Review comment:
       Do we need to consider the compatibility of the client? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-876265103


   Hi @Anonymitaet , could you help @congbobo184 in documenting this feature? 
   
   Since this PR also changed the wire protocol (there were protobuf changes), I think that this requires creating a PIP document. This is a common rule in Pulsar development that there must be a PIP for wire protocol changes. @codelipenghui Could you help with a PIP document for the "ack receipt" feature? /cc @merlimat @sijie


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 closed pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 closed pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-761751053


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-875455717


   @congbobo184 @codelipenghui @Anonymitaet Has the feature added by this PR been documented? The only documentation that I can find is in the javadoc of the isAckReceiptEnabled method "Ack will return receipt but does not mean that the message will not be resent after get receipt." . 
   Would it be possible to clarify this in the Pulsar documentation?
   Usually PIP documents are created for changes which change the wire protocol. I didn't find and PIP documents or mailing list discussions about this change. What problem is this change solving? The issue and PR description aren't very informative. Please elaborate more about the context behind this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557844360



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -330,16 +462,41 @@ public void flush() {
             return;
         }
 
+        if (ackResponseEnabled) {
+            this.lock.writeLock().lock();
+            try {
+                flushAsync(cnx);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } else {
+            flushAsync(cnx);
+        }
+    }
+
+    private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx, false /* flush */, -1, -1);
+            if (ackResponseEnabled) {
+                long requestId = consumer.getClient().newRequestId();
+                ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
+                        lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+                        AckType.Cumulative, null, Collections.emptyMap(), requestId);
+                cnx.newAckForResponseWithFuture(cmd, requestId, currentCumulativeAckFuture);
+                this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
+            } else {
+                ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
+                        lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+                        AckType.Cumulative, null, Collections.emptyMap(), -1);
+                cnx.ctx().write(cmd, cnx.ctx().voidPromise());

Review comment:
       if we optimize the code, we will judge the ackResponseEnabled twice, because get requestId then new cmd and then write. get requestId and write are different from normal ack and ack response. the middle operation newAckComand can't be optimized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-755232950


   @codelipenghui Can you review it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-883220843


   @congbobo184 I recently came across #7683 . Does this PR somehow help resolve that issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-752808613


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-883220843


   @congbobo184 I recently came across #7683 . Does this PR somehow help resolve that issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r558251500



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1802,7 +1816,7 @@ public void testUnackedBlockAtBatch(int batchMessageDelayMs) throws Exception {
                 if (msg != null) {
                     messages.add(msg);
                     totalReceiveMessages++;
-                    consumer1.acknowledge(msg);
+                    consumer1.acknowledgeAsync(msg);

Review comment:
       it won't influence the result, if wait it the test will spend a lot of time when enable the ack response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-763379934


   /pulsarbot run-faliure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r558250324



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1098,7 +1111,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         // 6. consume messages : at slower subscriber
         for (int i = 0; i < totalMsgs; i++) {
             msg = subscriber2.receive(100, TimeUnit.MILLISECONDS);
-            subscriber2.acknowledge(msg);
+            subscriber2.acknowledgeAsync(msg);

Review comment:
       it won't influence the result, if wait it the test will spend a lot of time when enable the ack response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-747918684


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-763709457


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-761545040


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-754317374


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r549152589



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
                                                          AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-                    doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
+                                                     Map<String, Long> properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
+                    // all ack complete
                     return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {

Review comment:
       What happens if we don't have this class? Should we add an *instanceof* test?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
                                                          AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();

Review comment:
       Move this line out of the try block

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
                                                          AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-                    doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
+                                                     Map<String, Long> properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
+                    // all ack complete
                     return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchAck(batchMessageId, properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatusInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
-        return CompletableFuture.completedFuture(null);
     }
 
-    @Override
-    public CompletableFuture<Void> addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, TransactionImpl txn) {
-        if (batchIndexAckEnabled) {
-            if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-                doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties,
-                        txn == null ? -1 : txn.getTxnIdMostBits(),
-                        txn == null ? -1 : txn.getTxnIdLeastBits());
-            } else if (ackType == AckType.Cumulative) {
-                BitSetRecyclable bitSet = BitSetRecyclable.create();
-                bitSet.set(0, batchSize);
-                bitSet.clear(0, batchIndex + 1);
-                doCumulativeAck(msgId, bitSet);
-            } else if (ackType == AckType.Individual) {
-                ConcurrentBitSetRecyclable bitSet;
-                bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
-                        new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(),
-                                msgId.getPartitionIndex()), (v) -> {
-                            ConcurrentBitSetRecyclable value;
-                            if (msgId.getAcker() != null &&
-                                    !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
-                                value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
-                            } else {
-                                value = ConcurrentBitSetRecyclable.create();
-                                value.set(0, batchSize);
-                            }
-                            return value;
-                        });
-                bitSet.clear(batchIndex);
-                if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+    private MessageIdImpl modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
+        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        modifyMessageIdStatusInConsumerCommon(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        modifyMessageIdStatusInConsumerCommon(messageId);
+    }
+
+    private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction subscription.
+            return doImmediateAck(messageId, AckType.Individual, properties, null);
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();

Review comment:
       Move this line out of the finally block




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-762011159


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r558250117



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1079,7 +1092,7 @@ public void testDeactivatingBacklogConsumer() throws Exception {
         // 3. Consume messages: at Faster subscriber
         for (int i = 0; i < totalMsgs; i++) {
             msg = subscriber1.receive(100, TimeUnit.MILLISECONDS);
-            subscriber1.acknowledge(msg);
+            subscriber1.acknowledgeAsync(msg);

Review comment:
       it won't influence the result, if wait it the test will spend a lot of time when enable the ack response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-876310669


   @lhotari The ack response proposal contained in the transaction proposal, you can find here https://docs.google.com/document/d/145VYp09JKTw9jAT-7yNyFU255FptB2_B2Fye100ZXDI/edit#bookmark=id.r370pcotoarj
   
   For the documentation, you can just create an issue to track it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557265929



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
##########
@@ -864,6 +864,32 @@ public TransactionConflictException(String msg) {
         }
     }
 
+    /**
+     * Consumer ack for response timeout.
+     */
+    public static class AckResponseTimeoutException extends PulsarClientException {

Review comment:
       Is the `PulsarClientException.TimeoutException` works?

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
##########
@@ -186,6 +186,14 @@
      */
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
+    /**
+     * Ack will return response but does not mean that the message will not be resent after get response.
+     *
+     * @param ackResponseEnabled {@link Boolean} is enable ack for response
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled);

Review comment:
       ```suggestion
       ConsumerBuilder<T> isAckReceiptEnabled(boolean ackReceiptEnabled);
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -330,16 +462,41 @@ public void flush() {
             return;
         }
 
+        if (ackResponseEnabled) {
+            this.lock.writeLock().lock();
+            try {
+                flushAsync(cnx);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        } else {
+            flushAsync(cnx);
+        }
+    }
+
+    private void flushAsync(ClientCnx cnx) {
         boolean shouldFlush = false;
         if (cumulativeAckFlushRequired) {
-            newAckCommand(consumer.consumerId, lastCumulativeAck, lastCumulativeAckSet, AckType.Cumulative, null, Collections.emptyMap(), cnx, false /* flush */, -1, -1);
+            if (ackResponseEnabled) {
+                long requestId = consumer.getClient().newRequestId();
+                ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
+                        lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+                        AckType.Cumulative, null, Collections.emptyMap(), requestId);
+                cnx.newAckForResponseWithFuture(cmd, requestId, currentCumulativeAckFuture);
+                this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
+            } else {
+                ByteBuf cmd = Commands.newAck(consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
+                        lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
+                        AckType.Cumulative, null, Collections.emptyMap(), -1);
+                cnx.ctx().write(cmd, cnx.ctx().voidPromise());

Review comment:
       Please optimize duplicate code

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
+                                                         AckType ackType, Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-                    doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
+                                                     Map<String, Long> properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchAck(batchMessageId, properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatusInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new ConcurrentHashMap<>());
-                    bitSet = transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
+        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        modifyMessageIdStatusInConsumerCommon(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        modifyMessageIdStatusInConsumerCommon(messageId);
+    }
+
+    private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }

Review comment:
       These methods not clear here. Stats, not status. And should split the update stats method and cleanup consumer method stay independent, this will improve the code readability

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId,
+                                                            BitSetRecyclable bitSet, AckType ackType,
+                                                            Map<String, Long> map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);

Review comment:
       Should write the command to the broker?  And please consider reducing the duplicate code 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -816,25 +833,35 @@ SocketAddress serverAddrees() {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) {
-        return sendRequestAndHandleTimeout(cmd, requestId, RequestType.Command);
+        return sendRequestAndHandleTimeout(cmd, requestId, RequestType.Command, true,
+                new TimedCompletableFuture<>());
     }
 
-    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
-        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
+                                                                 RequestType requestType, boolean flush,
+                                                                 TimedCompletableFuture<T> future) {

Review comment:
       Do we need to return the future again?

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
##########
@@ -186,6 +186,14 @@
      */
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
+    /**
+     * Ack will return response but does not mean that the message will not be resent after get response.
+     *
+     * @param ackResponseEnabled {@link Boolean} is enable ack for response
+     * @return the consumer builder instance
+     */
+    ConsumerBuilder<T> enableAckResponse(boolean ackResponseEnabled);

Review comment:
       I think `response` here a little bit ambiguous since the client also can get the returned future without this feature. 

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
                                                          AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
-        return CompletableFuture.completedFuture(null);
     }
 
     @Override
-    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-                    doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
+                                                     Map<String, Long> properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
+                    // all ack complete
                     return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {

Review comment:
       line 183 is checked, I think @congbobo184 you can use `batchMessageId` since you already convert to `BatchMessageIdImpl` in 184

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
##########
@@ -816,25 +833,35 @@ SocketAddress serverAddrees() {
     }
 
     CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) {
-        return sendRequestAndHandleTimeout(cmd, requestId, RequestType.Command);
+        return sendRequestAndHandleTimeout(cmd, requestId, RequestType.Command, true,
+                new TimedCompletableFuture<>());
     }
 
-    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
-        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
+    private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
+                                                                 RequestType requestType, boolean flush,
+                                                                 TimedCompletableFuture<T> future) {

Review comment:
       I think you can use 2 methods to handle this case
   
   ```
   private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId,
                                                                    RequestType requestType, boolean flush,
                                                                    TimedCompletableFuture<T> future) {
   
   }
   ```
   
   ```
   private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType, boolean flush) {
          TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
           sendRequestAndHandleTimeout(... future);
           return future;
   }
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId,
+                                                            BitSetRecyclable bitSet, AckType ackType,
+                                                            Map<String, Long> map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);
                 } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    completableFuture = CompletableFuture.completedFuture(null);
                 }
             } else {
+                // if don't support multi message ack, it also support ack response, so we should not think about the
+                // ack response in this logic
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     ByteBuf cmd = Commands.newAck(consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(),
-                            lastCumulativeAckSet, ackType, validationError, map);
-                    if (flush) {
-                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                    } else {
-                        cnx.ctx().write(cmd, cnx.ctx().voidPromise());
-                    }
+                            bitSet, ackType, null, map, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
                 }
+                completableFuture = CompletableFuture.completedFuture(null);
             }
-            this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
         } else {
-            ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), lastCumulativeAckSet,
-                    ackType, validationError, map, txnidLeastBits, txnidMostBits, -1);
-            if (flush) {
-                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+            if (ackResponseEnabled) {
+                long requestId = consumer.getClient().newRequestId();
+                ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet,
+                        ackType, null, map, requestId);
+                completableFuture = cnx.newAckForResponse(cmd, requestId);
             } else {
-                cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                ByteBuf cmd = Commands.newAck(consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet,
+                        ackType, null, map, -1);
+                cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                completableFuture = CompletableFuture.completedFuture(null);
+            }
+        }
+        return completableFuture;

Review comment:
       I think we can use a method `getRequestId()` and a method `getCompletableFuture()`. I think this will make the logic simpler

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId,
+                                                            BitSetRecyclable bitSet, AckType ackType,
+                                                            Map<String, Long> map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);
                 } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    completableFuture = CompletableFuture.completedFuture(null);
                 }
             } else {
+                // if don't support multi message ack, it also support ack response, so we should not think about the
+                // ack response in this logic

Review comment:
       Why only support for muti message ack. Single message acknowledge also can enable ack receipt?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -118,175 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public CompletableFuture<Void> addListAcknowledgment(List<MessageIdImpl> messageIds,
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
                                                          AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return CompletableFuture.completedFuture(null);
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();

Review comment:
       +1

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -115,186 +122,302 @@ public boolean isDuplicate(MessageId messageId) {
     }
 
     @Override
-    public void addListAcknowledgment(List<MessageIdImpl> messageIds, AckType ackType, Map<String, Long> properties) {
-        if (ackType == AckType.Cumulative) {
-            messageIds.forEach(messageId -> doCumulativeAck(messageId, null));
-            return;
+    public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
+                                                         AckType ackType, Map<String, Long> properties) {
+        if (AckType.Cumulative.equals(ackType)) {
+            if (ackResponseEnabled) {
+                Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
+                messageIds.forEach(messageId ->
+                        completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
+                return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
+            } else {
+                messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
+                return CompletableFuture.completedFuture(null);
+            }
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    addListAcknowledgment(messageIds);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
+                }
+            } else {
+                addListAcknowledgment(messageIds);
+                if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
+            }
         }
-        messageIds.forEach(messageId -> {
+    }
+
+    private void addListAcknowledgment(List<MessageId> messageIds) {
+        for (MessageId messageId : messageIds) {
+            consumer.onAcknowledge(messageId, null);
             if (messageId instanceof BatchMessageIdImpl) {
                 BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
-                pendingIndividualAcks.add(new MessageIdImpl(batchMessageId.getLedgerId(),
-                        batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (!batchMessageId.ackIndividual()) {
+                    doIndividualBatchAckAsync((BatchMessageIdImpl) messageId);
+                } else {
+                    messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    doIndividualAckAsync((MessageIdImpl) messageId);
+                }
             } else {
-                pendingIndividualAcks.add(messageId);
-            }
-            pendingIndividualBatchIndexAcks.remove(messageId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+                modifyMessageIdStatusInConsumer((MessageIdImpl) messageId);
+                doIndividualAckAsync((MessageIdImpl) messageId);
             }
-        });
-        if (acknowledgementGroupTimeMicros == 0) {
-            flush();
         }
     }
 
     @Override
-    public void addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                  TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty() ||
-                (txn != null && ackType == AckType.Cumulative)) {
-                if (msgId instanceof BatchMessageIdImpl && txn != null) {
-                    BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
-                    doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
-                            batchMessageId.getBatchIndex(),
-                            ackType, properties, txn.getTxnIdMostBits(), txn.getTxnIdLeastBits());
-                    return;
+    public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
+                                                     Map<String, Long> properties) {
+        if (msgId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                // ack this ack carry bitSet index and judge bit set are all ack
+                if (batchMessageId.ackIndividual()) {
+                    MessageIdImpl messageId = modifyBatchMessageIdAndStatusInConsumer(batchMessageId);
+                    return doIndividualAck(messageId, properties);
+                } else if (batchIndexAckEnabled){
+                    return doIndividualBatchAck(batchMessageId, properties);
+                } else {
+                    // if we prevent batchIndexAck, we can't send the ack command to broker when the batch message are
+                    // all ack complete
+                    return CompletableFuture.completedFuture(null);
                 }
-            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
-            // uncommon condition since it's only used for the compaction subscription.
-            doImmediateAck(msgId, ackType, properties, txn);
-        } else if (ackType == AckType.Cumulative) {
-            doCumulativeAck(msgId, null);
-        } else {
-            // Individual ack
-            if (msgId instanceof BatchMessageIdImpl) {
-                pendingIndividualAcks.add(new MessageIdImpl(msgId.getLedgerId(),
-                        msgId.getEntryId(), msgId.getPartitionIndex()));
             } else {
-                if (txn != null) {
-                    pendingIndividualTransactionAcks
-                            .add(Triple.of(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits(), msgId));
+                consumer.onAcknowledgeCumulative(msgId, null);
+                if (((BatchMessageIdImpl) msgId).ackCumulative()) {
+                    return doCumulativeAck(msgId, properties, null);
                 } else {
-                    pendingIndividualAcks.add(msgId);
+                    if (batchIndexAckEnabled) {
+                        return doCumulativeBatchAck(batchMessageId, properties);
+                    } else {
+                        // ack the pre messageId, because we prevent the batchIndexAck, we can ensure pre messageId can
+                        // ack
+                        if (AckType.Cumulative == ackType
+                                && !batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
+                            doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
+                            batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
+                        }
+                        return CompletableFuture.completedFuture(null);
+                    }
                 }
             }
-            pendingIndividualBatchIndexAcks.remove(msgId);
-            if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        } else {
+            if (ackType == AckType.Individual) {
+                consumer.onAcknowledge(msgId, null);
+                modifyMessageIdStatusInConsumer(msgId);
+                return doIndividualAck(msgId, properties);
+            } else {
+                consumer.onAcknowledgeCumulative(msgId, null);
+                return doCumulativeAck(msgId, properties, null);
             }
         }
     }
 
-    public void addBatchIndexAcknowledgment(BatchMessageIdImpl msgId, int batchIndex, int batchSize, AckType ackType,
-                                            Map<String, Long> properties, TransactionImpl txn) {
-        if (acknowledgementGroupTimeMicros == 0 || !properties.isEmpty()) {
-            doImmediateBatchIndexAck(msgId, batchIndex, batchSize, ackType, properties,
-                    txn == null ? -1 : txn.getTxnIdMostBits(),
-                    txn == null ? -1 : txn.getTxnIdLeastBits());
-        } else if (ackType == AckType.Cumulative) {
-            BitSetRecyclable bitSet = BitSetRecyclable.create();
-            bitSet.set(0, batchSize);
-            bitSet.clear(0, batchIndex + 1);
-            doCumulativeAck(msgId, bitSet);
-        } else if (ackType == AckType.Individual) {
-            ConcurrentBitSetRecyclable bitSet;
-            if (txn != null) {
-                synchronized (txn) {
-                    ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> transactionIndividualBatchIndexAcks =
-                            pendingIndividualTransactionBatchIndexAcks
-                                    .computeIfAbsent(txn, (v) -> new ConcurrentHashMap<>());
-                    bitSet = transactionIndividualBatchIndexAcks.computeIfAbsent(msgId, (v) -> {
-                        ConcurrentBitSetRecyclable value;
-                        value = ConcurrentBitSetRecyclable.create();
-                        value.set(0, msgId.getAcker().getBatchSize());
-                        return value;
-                    });
-                    bitSet.clear(batchIndex);
+    private MessageIdImpl modifyBatchMessageIdAndStatusInConsumer(BatchMessageIdImpl batchMessageId) {
+        MessageIdImpl messageId = new MessageIdImpl(batchMessageId.getLedgerId(),
+                batchMessageId.getEntryId(), batchMessageId.getPartitionIndex());
+        consumer.getStats().incrementNumAcksSent(batchMessageId.getBatchSize());
+        modifyMessageIdStatusInConsumerCommon(messageId);
+        return messageId;
+    }
+
+    private void modifyMessageIdStatusInConsumer(MessageIdImpl messageId) {
+        consumer.getStats().incrementNumAcksSent(1);
+        modifyMessageIdStatusInConsumerCommon(messageId);
+    }
+
+    private void modifyMessageIdStatusInConsumerCommon(MessageIdImpl messageId) {
+        consumer.getUnAckedMessageTracker().remove(messageId);
+        if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
+            consumer.getPossibleSendToDeadLetterTopicMessages().remove(messageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction subscription.
+            return doImmediateAck(messageId, AckType.Individual, properties, null);
+        } else {
+            if (ackResponseEnabled) {
+                // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                // any ack operation is allowed.
+                this.lock.readLock().lock();
+                try {
+                    doIndividualAckAsync(messageId);
+                    return this.currentIndividualAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
+                    }
                 }
             } else {
-                bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
-                new MessageIdImpl(msgId.getLedgerId(), msgId.getEntryId(), msgId.getPartitionIndex()), (v) -> {
-                            ConcurrentBitSetRecyclable value;
-                            if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
-                                value = ConcurrentBitSetRecyclable.create(msgId.getAcker().getBitSet());
-                            } else {
-                                value = ConcurrentBitSetRecyclable.create();
-                                value.set(0, batchSize);
-                            }
-                            return value;
-                        });
-                bitSet.clear(batchIndex);
+                doIndividualAckAsync(messageId);
+                if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
             }
-            if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
-                flush();
+        }
+    }
+
+
+    private void doIndividualAckAsync(MessageIdImpl messageId) {
+        pendingIndividualAcks.add(messageId);
+        pendingIndividualBatchIndexAcks.remove(messageId);
+    }
+
+    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId,
+                                                         Map<String, Long> properties) {
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
+            return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
+                    batchMessageId.getBatchSize(), AckType.Individual, properties);
+        } else {
+            return doIndividualBatchAck(batchMessageId);
+        }
+    }
+
+    private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId) {
+        if (ackResponseEnabled) {
+            // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+            // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+            // any ack operation is allowed.
+            this.lock.readLock().lock();
+            try {
+                doIndividualBatchAckAsync(batchMessageId);
+                return this.currentIndividualAckFuture;
+            } finally {
+                this.lock.readLock().unlock();
             }
+        } else {
+            doIndividualBatchAckAsync(batchMessageId);
+            return CompletableFuture.completedFuture(null);
         }
     }
 
-    private void doCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) {
-        // Handle concurrent updates from different threads
-        while (true) {
-            MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
-            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
-            if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet)) {
-                    if (lastBitSet != null) {
-                        try {
-                            lastBitSet.recycle();
-                        } catch (Exception ignore) {
-                            // no-op
-                        }
+    private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<String, Long> properties,
+                                                    BitSetRecyclable bitSet) {
+        consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
+        if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
+            // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
+            // uncommon condition since it's only used for the compaction subscription.
+            return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet);
+        } else {
+            if (ackResponseEnabled) {
+                try {
+                    // when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
+                    // change currentFuture. but we can lock by the read lock, because the currentFuture is not change
+                    // any ack operation is allowed.
+                    this.lock.readLock().lock();
+                    doCumulativeAckAsync(messageId, bitSet);
+                    return this.currentCumulativeAckFuture;
+                } finally {
+                    this.lock.readLock().unlock();
+                    if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                        flush();
                     }
-                    // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
-                    cumulativeAckFlushRequired = true;
-                    return;
                 }
             } else {
-                // message id acknowledging an before the current last cumulative ack
-                return;
+                doCumulativeAckAsync(messageId, bitSet);
+                if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
+                    flush();
+                }
+                return CompletableFuture.completedFuture(null);
             }
         }
     }
 
-    private void doTransactionCumulativeAck(MessageIdImpl msgId, BitSetRecyclable bitSet) {
+    private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
+        ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
+                new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
+                        batchMessageId.getPartitionIndex()), (v) -> {
+                    ConcurrentBitSetRecyclable value;
+                    if (batchMessageId.getAcker() != null &&
+                            !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {
+                        value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
+                    } else {
+                        value = ConcurrentBitSetRecyclable.create();
+                        value.set(0, batchMessageId.getBatchIndex());
+                    }
+                    return value;
+                });
+        bitSet.clear(batchMessageId.getBatchIndex());
+    }
+
+    private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
         // Handle concurrent updates from different threads
+        LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
         while (true) {
-            MessageIdImpl lastCumlativeAck = this.lastCumulativeAck;
-            BitSetRecyclable lastBitSet = this.lastCumulativeAckSet;
-            if (msgId.compareTo(lastCumlativeAck) > 0) {
-                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, lastCumlativeAck, msgId) && LAST_CUMULATIVE_ACK_SET_UPDATER.compareAndSet(this, lastBitSet, bitSet)) {
-                    if (lastBitSet != null) {
+            LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
+            if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
+                if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
+                    if (lastCumulativeAck.bitSetRecyclable != null) {
                         try {
-                            lastBitSet.recycle();
+                            lastCumulativeAck.bitSetRecyclable.recycle();
                         } catch (Exception ignore) {
                             // no-op
                         }
+                        lastCumulativeAck.bitSetRecyclable = null;
                     }
+                    lastCumulativeAck.recycle();
                     // Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
                     cumulativeAckFlushRequired = true;
                     return;
                 }
             } else {
+                currentCumulativeAck.recycle();
                 // message id acknowledging an before the current last cumulative ack
                 return;
             }
         }
     }
 
-    private boolean doImmediateAck(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
-                                   TransactionImpl transaction) {
+    private CompletableFuture<Void> doCumulativeBatchAck(BatchMessageIdImpl batchMessageId,

Review comment:
       ```suggestion
       private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-757602045


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-752385565


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-763411650


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-763632343


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r558251230



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -1397,13 +1412,7 @@ public void testConsumerBlockingWithUnAckedMessages() throws Exception {
             assertEquals(messages.size(), unAckedMessagesBufferSize);
 
             // start acknowledging messages
-            messages.forEach(m -> {
-                try {
-                    consumer.acknowledge(m);
-                } catch (PulsarClientException e) {
-                    fail("ack failed", e);
-                }
-            });
+            messages.forEach(consumer::acknowledgeAsync);

Review comment:
       it won't influence the result, if wait it the test will spend a lot of time when enable the ack response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#discussion_r557839031



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java
##########
@@ -497,46 +586,84 @@ public void close() {
         }
     }
 
-    private void newAckCommand(long consumerId, MessageIdImpl msgId, BitSetRecyclable lastCumulativeAckSet,
-            AckType ackType, ValidationError validationError, Map<String, Long> map, ClientCnx cnx,
-                               boolean flush, long txnidMostBits, long txnidLeastBits) {
-
-        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
-        if (chunkMsgIds != null && txnidLeastBits < 0 && txnidMostBits < 0) {
-            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())
-                    && ackType != AckType.Cumulative) {
+    private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId,
+                                                            BitSetRecyclable bitSet, AckType ackType,
+                                                            Map<String, Long> map, ClientCnx cnx) {
+        MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
+        final CompletableFuture<Void> completableFuture;
+        // cumulative ack chunk by the last messageId
+        if (chunkMsgIds != null &&  ackType != AckType.Cumulative) {
+            if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
                 List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
                 for (MessageIdImpl cMsgId : chunkMsgIds) {
                     if (cMsgId != null && chunkMsgIds.length > 1) {
                         entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
                     }
                 }
-                ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck);
-                if (flush) {
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                if (ackResponseEnabled) {
+                    long requestId = consumer.getClient().newRequestId();
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, requestId);
+                    completableFuture = cnx.newAckForResponse(cmd, requestId);
                 } else {
-                    cnx.ctx().write(cmd, cnx.ctx().voidPromise());
+                    ByteBuf cmd = Commands.newMultiMessageAck(consumer.consumerId, entriesToAck, -1);
+                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    completableFuture = CompletableFuture.completedFuture(null);
                 }
             } else {
+                // if don't support multi message ack, it also support ack response, so we should not think about the
+                // ack response in this logic

Review comment:
       this is the chunk message, so if want to use ack response, broker must support multi message ack, we don't need to support broker don't support multi message ack and then we return ack response.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-762048005


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-749858584


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-767456451


   Removed from the release/2.7.1 since this is a new feature and there are some `.proto` related changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari edited a comment on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
lhotari edited a comment on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-875455717


   @congbobo184 @codelipenghui @Anonymitaet @merlimat Has the "ackReceipt" feature added by this PR been documented? The only documentation that I can find is in the javadoc of the isAckReceiptEnabled method "Ack will return receipt but does not mean that the message will not be resent after get receipt." . 
   Would it be possible to clarify this in the Pulsar documentation?
   Usually PIP documents are created for changes which change the wire protocol. I didn't find and PIP documents or mailing list discussions about this change. What problem is this change solving? The issue and PR description aren't very informative. Please elaborate more about the context behind this change.
   
   Would it make sense to document this change in the "Pulsar binary protocol specification" document,  https://pulsar.apache.org/docs/en/develop-binary-protocol/  https://github.com/apache/pulsar/blob/master/site2/docs/developing-binary-protocol.md ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on pull request #8996: Ack response implementation

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on pull request #8996:
URL: https://github.com/apache/pulsar/pull/8996#issuecomment-758380747


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org