You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/02 07:41:40 UTC

[GitHub] [pulsar] congbobo184 commented on a diff in pull request #15729: [improve][txn] Support ack message list for transaction

congbobo184 commented on code in PR #15729:
URL: https://github.com/apache/pulsar/pull/15729#discussion_r887634742


##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -414,13 +414,6 @@ void reconsumeLater(Message<?> message,
      * @param messageId
      *            The {@code MessageId} to be cumulatively acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId is less than the messageId in pending ack state or ack with transaction is
-     *             different from the transaction in pending ack.
-     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction

Review Comment:
   don't delete this



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -578,11 +579,32 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
     }
 
     @Override
-    protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList,
-                                                    AckType ackType,
-                                                    Map<String, Long> properties,
-                                                    TransactionImpl txn) {
-        return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
+    protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
+                                                    Map<String, Long> properties, TransactionImpl txn) {
+
+        for (MessageId messageId : messageIdList) {
+            checkArgument(messageId instanceof MessageIdImpl);
+        }
+        if (getState() != State.Ready && getState() != State.Connecting) {
+            stats.incrementNumAcksFailed();
+            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageIdList, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageIdList, exception);
+            }
+            return FutureUtil.failedFuture(exception);
+        }
+        if (txn != null) {
+            List<CompletableFuture<Void>> completableFutures = new LinkedList<>();
+            for (MessageId messageId : messageIdList) {
+                completableFutures.add(doTransactionAcknowledgeForResponse(messageId, ackType, null,

Review Comment:
   the proto support send messgeIdList, so you should rewrite a command is better.
   https://github.com/apache/pulsar/blob/9b01d1c472aa5cde5205ad86fa3b8f998da95f35/pulsar-common/src/main/proto/PulsarApi.proto#L553



##########
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java:
##########
@@ -469,13 +462,6 @@ CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId,
      *
      * @param messageId {@link MessageId} to be individual acknowledged
      * @param txn {@link Transaction} the transaction to cumulative ack
-     * @throws PulsarClientException.AlreadyClosedException
-     *             if the consumer was already closed
-     * @throws org.apache.pulsar.client.api.PulsarClientException.TransactionConflictException
-     *             if the ack with messageId has been acked by another transaction
-     * @throws org.apache.pulsar.client.api.PulsarClientException.NotAllowedException
-     *             broker don't support transaction
-     *             don't find batch size in consumer pending ack

Review Comment:
   if you want to delete this, please add the sync method in the consumer API



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java:
##########
@@ -606,12 +625,21 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
                                                            Map<String, Long> properties,
                                                            TransactionImpl txn) {
         CompletableFuture<Void> ackFuture;
-        if (txn != null) {
+        if (txn != null && this instanceof ConsumerImpl) {
+
+            // it is okay that we register acked topic after sending the acknowledgements. because
+            // the transactional ack will not be visiable for consumers until the transaction is
+            // committed
+            if (ackType == AckType.Cumulative) {
+                txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
+            }
+
             ackFuture = txn.registerAckedTopic(getTopic(), subscription)
                     .thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));

Review Comment:
   we don't need regist ack in this code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org