You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/09 08:54:17 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #18833: [fix] [tx] Transaction buffer recover can not completed

poorbarcode opened a new pull request, #18833:
URL: https://github.com/apache/pulsar/pull/18833

   Fixes #17040
   
   ### Motivation
   
   #### Context for Transaction Buffer
   - If turn on `transactionCoordinatorEnabled`,  then `TransactionBuffer` will be initialized when a user topic create.
   - The `TransactionBuffer` reads the aborted logs of transactions from topic `__transaction_buffer_snapshot`  -- this process is called `recovery`.
   - During recovery, the reading from that snapshot ledger is done via a `Reader`; the reader works like this:
   ```
   while (reader.hasMessageAvailable()){
       reader.readNext();
   }
   ``` 
   
   #### Context for Compaction
   - After [pip-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction), the consumer that enabled feature read-compacted will read messages from the compacted topic instead of the original topic if the task-compaction is done, and read messages from the original topic if task-compaction is not done.
   - If the data of the last message with key k sent to a topic is null, the compactor will mark all messages for that key as deleted.
   
   #### Issue
   There is a race condition: after executing `reader.hasMessageAvailable`,  the following messages have been deleted by compaction-task, so read next will be blocked because there have no messages to read.
   
   
   ### Modifications
   
   - If hits this issue, do recover again.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   - https://github.com/poorbarcode/pulsar/pull/47
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1044314776


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   It treats a message that cannot be received in 2 seconds as an exceptional case that the message has been deleted. I think it should be done in `SystemTopicClient#readNext` instead of adding another public API. If there are other places (e.g. a protocol handler that could access this interface) that calls `SystemTopicClient#readNext` without the timeout, the error should be processed in the same way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1048123436


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +88,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message;
+                            try {
+                                message = reader.readNextAsync().get(30, TimeUnit.SECONDS);
+                            } catch (Exception ex) {
+                                Throwable t = FutureUtil.unwrapCompletionException(ex);

Review Comment:
   Already fixed.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -248,6 +258,156 @@ private void recoverTest(String testTopic) throws Exception {
 
     }
 
+    private ProducerAndConsumer makeManyTx(int txCount, String topicName, String subName) throws Exception {
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .batchingMaxMessages(2)
+                .create();
+        producer.send("first message");
+        boolean lastTxCommitted = false;
+        Message lastMessage = null;
+        for(int i = 0; i < txCount; i++) {
+            Transaction transaction =
+                    pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+            lastMessage = consumer.receive();
+            producer.newMessage(transaction)
+                    .value(new StringBuilder("tx message 0-")
+                            .append(String.valueOf(lastMessage.getMessageId())).toString()).sendAsync();
+            producer.newMessage(transaction)
+                    .value(new StringBuilder("tx message 1-")
+                            .append(String.valueOf(lastMessage.getMessageId())).toString()).sendAsync();
+            consumer.acknowledgeAsync(lastMessage.getMessageId(), transaction);
+            if (i % 2 == 0) {
+                transaction.commit().get();
+                lastTxCommitted = true;
+            } else {
+                transaction.abort().get();
+                lastTxCommitted = false;
+            }
+        }
+        if (lastTxCommitted){
+            Message msg = consumer.receive();
+            consumer.acknowledge(msg);
+        } else {
+            consumer.acknowledge(lastMessage);
+        }
+        return new ProducerAndConsumer(producer, consumer);
+    }
+
+    @AllArgsConstructor
+    private static class ProducerAndConsumer {
+        public Producer<String> producer;
+        public Consumer<String> consumer;
+    }
+
+    private PersistentTopic findPersistentTopic(String topicName){
+        for (PulsarService pulsarService : pulsarServiceList){
+            CompletableFuture<Optional<Topic>> future = pulsarService.getBrokerService().getTopic(topicName, false);
+            if (future == null || !future.isDone() || future.isCompletedExceptionally() || !future.join().isPresent()){
+                continue;
+            }
+            return  (PersistentTopic) future.join().get();
+        }
+        throw new RuntimeException("topic[" + topicName + "] not found.");
+    }
+
+    private void triggerSnapshot(String topicName){
+        PersistentTopic persistentTopic = findPersistentTopic(topicName);
+        TopicTransactionBuffer topicTransactionBuffer =
+                (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        topicTransactionBuffer.run(null);
+    }
+
+    private void triggerCompactAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic = findPersistentTopic(topicName);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        persistentTopic.getBrokerService().getPulsar().getCompactor().compact(topicName);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl compaction = (ManagedCursorImpl) managedLedger.getCursors().get("__compaction");
+            assertEquals(compaction.getMarkDeletedPosition().getLedgerId(),
+                    managedLedger.getLastConfirmedEntry().getLedgerId());
+            assertEquals(compaction.getMarkDeletedPosition().getEntryId(),

Review Comment:
   Already fixed



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -90,15 +88,15 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);
-                            if (message == null){
-                                String warnLog = String.format("[%s] When reading from topic %s,the latest message has"
-                                                + " been deleted by compaction-task or trim ledger.",
-                                        topic.getName(),
-                                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-                                log.warn(warnLog);
-                                return FutureUtil.failedFuture(
-                                        new TransactionBufferException.TBRecoverCantCompletedException(warnLog));
+                            Message<TransactionBufferSnapshot> message;
+                            try {

Review Comment:
   already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045458756


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   Hi @BewareMyPower 
   
   I used the simper logic suggested by @congbobo184, could you retake a look? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#issuecomment-1354074244

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/18833?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#18833](https://codecov.io/gh/apache/pulsar/pull/18833?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (51d1f64) into [master](https://codecov.io/gh/apache/pulsar/commit/3180a4aa04d518fa401a781d646545221c4d1fa6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3180a4a) will **decrease** coverage by `19.33%`.
   > The diff coverage is `47.14%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/18833/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/18833?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #18833       +/-   ##
   =============================================
   - Coverage     46.17%   26.83%   -19.34%     
   + Complexity    10359     5039     -5320     
   =============================================
     Files           703      597      -106     
     Lines         68845    56867    -11978     
     Branches       7382     5902     -1480     
   =============================================
   - Hits          31788    15263    -16525     
   - Misses        33448    39425     +5977     
   + Partials       3609     2179     -1430     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `26.83% <47.14%> (-19.34%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/18833?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../main/java/org/apache/pulsar/PulsarStandalone.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL1B1bHNhclN0YW5kYWxvbmUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...broker/intercept/ManagedLedgerInterceptorImpl.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9pbnRlcmNlcHQvTWFuYWdlZExlZGdlckludGVyY2VwdG9ySW1wbC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...va/org/apache/pulsar/broker/service/ServerCnx.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NlcnZlckNueC5qYXZh) | `32.49% <ø> (-14.94%)` | :arrow_down: |
   | [...er/service/persistent/GeoPersistentReplicator.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvR2VvUGVyc2lzdGVudFJlcGxpY2F0b3IuamF2YQ==) | `0.00% <0.00%> (-4.71%)` | :arrow_down: |
   | [...roker/service/persistent/PersistentReplicator.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFJlcGxpY2F0b3IuamF2YQ==) | `0.00% <0.00%> (-24.06%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | `15.09% <0.00%> (-0.04%)` | :arrow_down: |
   | [...he/pulsar/client/impl/MultiTopicsConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL011bHRpVG9waWNzQ29uc3VtZXJJbXBsLmphdmE=) | `22.78% <0.00%> (-0.09%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ProducerImpl.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Byb2R1Y2VySW1wbC5qYXZh) | `15.66% <0.00%> (-1.34%)` | :arrow_down: |
   | [...er/impl/SingleSnapshotAbortedTxnProcessorImpl.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9TaW5nbGVTbmFwc2hvdEFib3J0ZWRUeG5Qcm9jZXNzb3JJbXBsLmphdmE=) | `20.87% <18.18%> (-52.94%)` | :arrow_down: |
   | [...nsaction/pendingack/impl/PendingAckHandleImpl.java](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9wZW5kaW5nYWNrL2ltcGwvUGVuZGluZ0Fja0hhbmRsZUltcGwuamF2YQ==) | `6.19% <50.00%> (-45.65%)` | :arrow_down: |
   | ... and [281 more](https://codecov.io/gh/apache/pulsar/pull/18833/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode closed pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode closed pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext
URL: https://github.com/apache/pulsar/pull/18833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045874234


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   IMO, we should add a `TimeoutException`  check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045457850


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   Good suggestion, already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1044237356


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -98,13 +101,20 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                                 }
                             }
                         }
-                        closeReader(reader);
                         return CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (NullPointerException npe) {

Review Comment:
   Already fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045740009


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -248,6 +258,156 @@ private void recoverTest(String testTopic) throws Exception {
 
     }
 
+    private ProducerAndConsumer makeManyTx(int txCount, String topicName, String subName) throws Exception {
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topicName)
+                .isAckReceiptEnabled(true)
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionName(subName)
+                .subscribe();
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topicName)
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .enableBatching(false)
+                .batchingMaxMessages(2)
+                .create();
+        producer.send("first message");
+        boolean lastTxCommitted = false;
+        Message lastMessage = null;
+        for(int i = 0; i < txCount; i++) {
+            Transaction transaction =
+                    pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
+            lastMessage = consumer.receive();
+            producer.newMessage(transaction)
+                    .value(new StringBuilder("tx message 0-")
+                            .append(String.valueOf(lastMessage.getMessageId())).toString()).sendAsync();
+            producer.newMessage(transaction)
+                    .value(new StringBuilder("tx message 1-")
+                            .append(String.valueOf(lastMessage.getMessageId())).toString()).sendAsync();
+            consumer.acknowledgeAsync(lastMessage.getMessageId(), transaction);
+            if (i % 2 == 0) {
+                transaction.commit().get();
+                lastTxCommitted = true;
+            } else {
+                transaction.abort().get();
+                lastTxCommitted = false;
+            }
+        }
+        if (lastTxCommitted){
+            Message msg = consumer.receive();
+            consumer.acknowledge(msg);
+        } else {
+            consumer.acknowledge(lastMessage);
+        }
+        return new ProducerAndConsumer(producer, consumer);
+    }
+
+    @AllArgsConstructor
+    private static class ProducerAndConsumer {
+        public Producer<String> producer;
+        public Consumer<String> consumer;
+    }
+
+    private PersistentTopic findPersistentTopic(String topicName){
+        for (PulsarService pulsarService : pulsarServiceList){
+            CompletableFuture<Optional<Topic>> future = pulsarService.getBrokerService().getTopic(topicName, false);
+            if (future == null || !future.isDone() || future.isCompletedExceptionally() || !future.join().isPresent()){
+                continue;
+            }
+            return  (PersistentTopic) future.join().get();
+        }
+        throw new RuntimeException("topic[" + topicName + "] not found.");
+    }
+
+    private void triggerSnapshot(String topicName){
+        PersistentTopic persistentTopic = findPersistentTopic(topicName);
+        TopicTransactionBuffer topicTransactionBuffer =
+                (TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
+        topicTransactionBuffer.run(null);
+    }
+
+    private void triggerCompactAndWait(String topicName) throws Exception {
+        PersistentTopic persistentTopic = findPersistentTopic(topicName);
+        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
+        persistentTopic.getBrokerService().getPulsar().getCompactor().compact(topicName);
+        Awaitility.await().untilAsserted(() -> {
+            ManagedCursorImpl compaction = (ManagedCursorImpl) managedLedger.getCursors().get("__compaction");
+            assertEquals(compaction.getMarkDeletedPosition().getLedgerId(),
+                    managedLedger.getLastConfirmedEntry().getLedgerId());
+            assertEquals(compaction.getMarkDeletedPosition().getEntryId(),

Review Comment:
   ```suggestion
                          assertEquals(compaction.getMarkDeletedPosition(), managedLedger.getLastConfirmedEntry());
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -90,15 +88,15 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);
-                            if (message == null){
-                                String warnLog = String.format("[%s] When reading from topic %s,the latest message has"
-                                                + " been deleted by compaction-task or trim ledger.",
-                                        topic.getName(),
-                                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-                                log.warn(warnLog);
-                                return FutureUtil.failedFuture(
-                                        new TransactionBufferException.TBRecoverCantCompletedException(warnLog));
+                            Message<TransactionBufferSnapshot> message;
+                            try {

Review Comment:
   how about moving this try block on the line-89 try block?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +88,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message;
+                            try {
+                                message = reader.readNextAsync().get(30, TimeUnit.SECONDS);
+                            } catch (Exception ex) {
+                                Throwable t = FutureUtil.unwrapCompletionException(ex);

Review Comment:
   please wrap TimeoutException, otherwise, tc will not retry the end tb op and client will not reconnect
   https://github.com/apache/pulsar/blob/1be5a69a079594d8d96d2a0ab7ab8b389da8865e/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L182-L187
   
   please check the return error code, current return error code seem not correct.
   https://github.com/apache/pulsar/blob/1be5a69a079594d8d96d2a0ab7ab8b389da8865e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1344-L1352



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1044320863


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   In addition, I'm wondering if the 2 seconds is reasonable. What if the message actually exists and the timeout happened only because of the network issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #18833:
URL: https://github.com/apache/pulsar/pull/18833


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#issuecomment-1422184374

   Introduced by #17847, no need to release for other branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#issuecomment-1351296232

   @BewareMyPower /cc 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045120130


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   >  I think it should be done in SystemTopicClient#readNext instead of adding another public API. If there are other places (e.g. a protocol handler that could access this interface) that calls SystemTopicClient#readNext without the timeout, the error should be processed in the same way.
   
   I think it is a good suggestion. This also leads to a lack of flexibility in the API. For example, if there is no message at the moment and I need to wait for the following message to be sent, there is no API to use.  Since the 'SystemTopicClient.Reader' is an internal-only API, I feel it would be better to provide better usability.
   
   > In addition, I'm wondering if the 2 seconds is reasonable. What if the message actually exists and the timeout happened only because of the network issue?
   
   Yes, you are right. we can't identify what is happening.
   
   ----
   
   The current fix is like a patch to fix this existing issue, and there will be one discussion to fix the root cause:
   - When the client already gets the last message id and caches it, then the compaction task will delete the messages end of this topic, do change the logic of the compaction task: the last message of this topic will be retained



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045863628


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   How to try recovery again if implemented in this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045368377


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);
+                            if (message == null){
+                                String warnLog = String.format("[%s] When reading from topic %s,the latest message has"
+                                                + " been deleted by compaction-task or trim ledger.",
+                                        topic.getName(),
+                                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                                log.warn(warnLog);

Review Comment:
   Maybe there should use the debug log level because this is a normal case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045458756


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   Hi @BewareMyPower 
   
   I used the [simper logic]((https://github.com/apache/pulsar/pull/18833#discussion_r1045360674) ) suggested by @congbobo184, could you retake a look? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045458756


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   Hi @BewareMyPower 
   
   I used the [simper logic](https://github.com/apache/pulsar/pull/18833#discussion_r1045360674) suggested by @congbobo184, could you retake a look? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 3286360470 commented on pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by "3286360470 (via GitHub)" <gi...@apache.org>.
3286360470 commented on PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#issuecomment-1501311859

   Are there steps to reproduce this issue stably? I have restarted the broker many times locally and still haven't reproduced the snapshot stuck situation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1044319872


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   I'd rather implement the wrapped `readNext()` like:
   
   ```java
           public Message<T> readNext() throws PulsarClientException {
               Message<T> message = reader.readNext(2, TimeUnit.SECONDS);
               if (message == null) {
                   throw new PulsarClientException.InvalidMessageException(
                           "When reading from topic " + reader.getTopic() + ", the latest message has been deleted");
               }
               return message;
           }
   ```
   
   Then the exception could be handled in the catch case:
   
   ```java
                       try {
                           while (reader.hasMoreEvents()) {
                               Message<TransactionBufferSnapshot> message = reader.readNext();
                               /* ... */
                       } catch (PulsarClientException.InvalidMessageException e) {
                           return FutureUtil.failedFuture(
                                   new TransactionBufferException.TBRecoverCantCompletedException(e.getMessage()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] gaoran10 commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
gaoran10 commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1044224377


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -98,13 +101,20 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                                 }
                             }
                         }
-                        closeReader(reader);
                         return CompletableFuture.completedFuture(startReadCursorPosition);
+                    } catch (NullPointerException npe) {

Review Comment:
   Could we check if the message is null or not directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045459813


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);
+                            if (message == null){
+                                String warnLog = String.format("[%s] When reading from topic %s,the latest message has"
+                                                + " been deleted by compaction-task or trim ledger.",
+                                        topic.getName(),
+                                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+                                log.warn(warnLog);

Review Comment:
   Hi @liangyepianzhou 
   
   I used the [simper logic](https://github.com/apache/pulsar/pull/18833#discussion_r1045360674) suggested by @congbobo184, could you retake a look? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045811151


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -90,15 +88,15 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);
-                            if (message == null){
-                                String warnLog = String.format("[%s] When reading from topic %s,the latest message has"
-                                                + " been deleted by compaction-task or trim ledger.",
-                                        topic.getName(),
-                                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-                                log.warn(warnLog);
-                                return FutureUtil.failedFuture(
-                                        new TransactionBufferException.TBRecoverCantCompletedException(warnLog));
+                            Message<TransactionBufferSnapshot> message;
+                            try {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1048125148


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   > IMO, we should add a TimeoutException check here.
   
   Yes, you are correct. already fixed
   
   > How to try recovery again if implemented in this way?
   
   The next recover will be triggered by the client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045120346


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -119,18 +129,18 @@ protected void cleanup() throws Exception {
     }
 
     @DataProvider(name = "testTopic")
-    public Object[] testTopic() {
-        return new Object[] {
-                RECOVER_ABORT,
-                RECOVER_COMMIT
+    public Object[][] testTopic() {
+        return new Object[][] {
+                {RECOVER_ABORT},
+                {RECOVER_COMMIT}
         };
     }
 
     @DataProvider(name = "enableSnapshotSegment")
-    public Object[] testSnapshot() {
-        return new Boolean[] {
-                true,
-                false
+    public Object[][] testSnapshot() {
+        return new Object[][] {
+                {true},
+                {false}
         };
     }

Review Comment:
   already deleted these unnecessary changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045360674


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   1. 
   ```
   try {
       Message<TransactionBufferSnapshot> message = reader.readNextAsync().get(30, TimeUnit.SECONDS);
   } catch (Exception ex) {
       Throwable t = FutureUtil.unwrapCompletionException(ex);
       log.error("[{}] Transaction buffer recover fail when read "
       + "transactionBufferSnapshot!", topic.getName(), t);
       closeReader(reader);
       return FutureUtil.failedFuture(t);
   }
   ```
   better to use this logic, topicTransactionBuffer use getTransactionExecutorProvider thread recover, so we don't need to care about the deadlock problem. and don't need to add any new API.
   2. it's better to use timeout client `operationTimeoutMs`, 2 seconds is too short



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045756434


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +88,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message;
+                            try {
+                                message = reader.readNextAsync().get(30, TimeUnit.SECONDS);
+                            } catch (Exception ex) {
+                                Throwable t = FutureUtil.unwrapCompletionException(ex);

Review Comment:
   please wrap TimeoutException, otherwise, tc will not retry the end tb op and client will not reconnect
   https://github.com/apache/pulsar/blob/1be5a69a079594d8d96d2a0ab7ab8b389da8865e/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L182-L187
   
   please check the return error code, current return error code seem not correct.
   https://github.com/apache/pulsar/blob/1be5a69a079594d8d96d2a0ab7ab8b389da8865e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1405-L1412



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045120130


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   >  I think it should be done in SystemTopicClient#readNext instead of adding another public API. If there are other places (e.g. a protocol handler that could access this interface) that calls SystemTopicClient#readNext without the timeout, the error should be processed in the same way.
   
   I think it is a good suggestion. This also leads to a lack of flexibility in the API. For example, if there is no message at the moment and I need to wait for the following message to be sent, there is no API to use.  Since the 'SystemTopicClient.Reader' is an internal-only API, I feel it would be better to provide better usability.
   
   > In addition, I'm wondering if the 2 seconds is reasonable. What if the message actually exists and the timeout happened only because of the network issue?
   
   Yes, you are right. we can't identify what is happening.
   
   The current fix is like a patch to fix this existing issue, and there will be one discussion to fix the root cause:
   - When the client already gets the last message id and caches it, then the compaction task will delete the messages end of this topic, do change the logic of the compaction task: the last message of this topic will be retained



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#issuecomment-1344050400

   This PR should merge into these branches:
   
   - `branch-2.9` @congbobo184 
   - `branch-2.10`
   - `branch-2.11` @liangyepianzhou 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1044312124


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -119,18 +129,18 @@ protected void cleanup() throws Exception {
     }
 
     @DataProvider(name = "testTopic")
-    public Object[] testTopic() {
-        return new Object[] {
-                RECOVER_ABORT,
-                RECOVER_COMMIT
+    public Object[][] testTopic() {
+        return new Object[][] {
+                {RECOVER_ABORT},
+                {RECOVER_COMMIT}
         };
     }
 
     @DataProvider(name = "enableSnapshotSegment")
-    public Object[] testSnapshot() {
-        return new Boolean[] {
-                true,
-                false
+    public Object[][] testSnapshot() {
+        return new Object[][] {
+                {true},
+                {false}
         };
     }

Review Comment:
   Please avoid unnecessary changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#discussion_r1045458756


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java:
##########
@@ -87,7 +90,16 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
                     PositionImpl startReadCursorPosition = null;
                     try {
                         while (reader.hasMoreEvents()) {
-                            Message<TransactionBufferSnapshot> message = reader.readNext();
+                            Message<TransactionBufferSnapshot> message = reader.readNext(2, TimeUnit.SECONDS);

Review Comment:
   Hi @BewareMyPower 
   
   I used the simper [logic suggested](https://github.com/apache/pulsar/pull/18833#discussion_r1045360674) by @congbobo184, could you retake a look? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #18833: [fix] [tx] Transaction buffer recover blocked by readNext

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on PR #18833:
URL: https://github.com/apache/pulsar/pull/18833#issuecomment-1501374389

   @3286360470 This reproduction requires steps to delete topics, otherwise it cannot be reproduced


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org