You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/13 04:36:51 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request, #16032: [fix][txn] Ack the same batch message different batchIndex with transaction

congbobo184 opened a new pull request, #16032:
URL: https://github.com/apache/pulsar/pull/16032

   ### Motivation
   fix different transactions ack the same batch message with different batch Index bug.
   
   now when the first ack the batch message with the transaction will put the current request position into the pending ack map 
   https://github.com/apache/pulsar/blob/9f40cc1d1104900c450a599676ca446b1f096a00/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java#L772
   
   but it will produce errors in some case
   
   eg. bach size is 3
   in this case:
   1. first txn ack the message will the batch Index 0, ackSet is `0 1 1`
   2. send txn ack the message with the batch index 1, ack set is `0 0 1`
   3. commit the first txn will ack the batch index 0, 1 in this cursor in batch index ack is `0 0 1`
   4. abort the second txn will redeliver the message
   5. third txn repeat ack the message with the batch index 1, because txn is aborted
   6. the third txn will not ack successful because the first txn has changed the acks set to 0 0 1 in cursor in batch index ack map.
   ### Modifications
   change the first position ack class to a new position class will not ack with the error ack set
   ### Verifying this change
   add a test for it 
   
   the third txn will ack the second message successful
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduces a new feature? (yes)
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
     - If a feature is not applicable for documentation, explain why?
     - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on pull request #16032: [fix][txn] Ack the same batch message different batchIndex with transaction

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on PR #16032:
URL: https://github.com/apache/pulsar/pull/16032#issuecomment-1154146512

   There is a check style error.
   
   ```
   Error:  src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java:[775] (sizes) LineLength: Line is longer than 120 characters (found 140).
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16032: [fix][txn] Ack the same batch message different batchIndex with transaction

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16032:
URL: https://github.com/apache/pulsar/pull/16032


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16032: [fix][txn] Ack the same batch message different batchIndex with transaction

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16032:
URL: https://github.com/apache/pulsar/pull/16032#discussion_r895601353


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -997,4 +997,55 @@ public void testConsistencyOfTransactionStatsAtEndTxn() throws Exception {
 
         transaction.commit().get();
     }
+
+    @Test
+    public void testPendingAckBatchMessageCommit() throws Exception {
+        String topic = NAMESPACE1 + "/testPendingAckBatchMessageCommit";
+
+        // enable batch index ack
+        conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient
+                .newProducer(Schema.BYTES)
+                .topic(topic)
+                .enableBatching(true)
+                // ensure that batch message is sent
+                .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient
+                .newConsumer()
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscribe();
+
+        // send batch message, the size is 5
+        for (int i = 0; i < 5; i++) {
+            producer.sendAsync(("test" + i).getBytes());
+        }
+

Review Comment:
   add producer.flush() here to avoid always waiting for 3 seconds 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org