You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/10 10:48:12 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #17577: [feat][pulsar-client-api] support to Cumulative Acknowledge for multiple partitions or topics in user friendly manner

eolivelli commented on code in PR #17577:
URL: https://github.com/apache/pulsar/pull/17577#discussion_r967633588


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -390,6 +390,21 @@ void reconsumeLater(Message<?> message,
      */
     void acknowledgeCumulative(MessageId messageId) throws PulsarClientException;
 
+    /**
+     * Acknowledge the reception of all the messages in the stream up to (and including) the provided message for each topic or partition.
+     *
+     * <p>This method will block until the acknowledge has been sent to the brokers of each topic or partition.

Review Comment:
   This is not correct.
   Please remove this sentence at all.
   Acks are always sent best effort.
   Only of you do something in a transaction you can have such guarantees.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -616,6 +642,25 @@ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, T
         return doAcknowledgeWithTxn(messageId, AckType.Cumulative, Collections.emptyMap(), txnImpl);
     }
 
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, MessageId> topicToMessageIdMap) throws PulsarClientException {
+        return acknowledgeCumulativeAsync(topicToMessageIdMap, null);
+    }
+
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, MessageId> topicToMessageIdMap, Transaction txn) throws PulsarClientException {

Review Comment:
   It looks like the keys are useless.
   So why do we need this API?



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -616,6 +642,25 @@ public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, T
         return doAcknowledgeWithTxn(messageId, AckType.Cumulative, Collections.emptyMap(), txnImpl);
     }
 
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, MessageId> topicToMessageIdMap) throws PulsarClientException {
+        return acknowledgeCumulativeAsync(topicToMessageIdMap, null);
+    }
+
+    protected CompletableFuture<Void> acknowledgeCumulativeAsync(Map<String, MessageId> topicToMessageIdMap, Transaction txn) throws PulsarClientException {
+        validateTopicToMessageIdMap(topicToMessageIdMap);
+        if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
+                    "Cannot use cumulative acks on a non-exclusive/non-failover subscription"));
+        }
+
+        TransactionImpl txnImpl = null;

Review Comment:
   It looks like we are not covering the TC branch with the new tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org