You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/22 14:01:07 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #9236: [Transaction] TC end transaction retry.

codelipenghui commented on a change in pull request #9236:
URL: https://github.com/apache/pulsar/pull/9236#discussion_r580264385



##########
File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
##########
@@ -1931,6 +1931,12 @@
     )
     private boolean transactionCoordinatorEnabled = false;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "End transaction operation retry interval time. (unit:second)"
+    )
+    private long endTxnOpRetryIntervalTime = 5;

Review comment:
       If this is an internal retry logic in the TransactionMetadataService, we don't need to expose the configuration here.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -305,7 +320,18 @@ public long getLowWaterMark(TxnID txnID) {
                 resultFuture.completeExceptionally(e);
             }
         });
-        return resultFuture;
+
+        return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction));
+    }
+
+    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
+        if (TxnAction.COMMIT.getValue() == txnAction) {
+            return updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING);
+        } else if (TxnAction.ABORT.getValue() == txnAction) {
+            return updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING);
+        } else {
+            return FutureUtil.failedFuture(new Throwable("Unsupported txnAction " + txnAction));

Review comment:
       Why complete with Throwable? It should be a transaction exception.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
##########
@@ -224,15 +231,23 @@ public long getLowWaterMark(TxnID txnID) {
         }
 
         completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
-                .thenCompose(ignored -> endTxnInTransactionBuffer(txnID, txnAction));
-        if (TxnStatus.COMMITTING.equals(newStatus)) {
-            completableFuture = completableFuture
-                    .thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING));
-        } else if (TxnStatus.ABORTING.equals(newStatus)) {
-            completableFuture = completableFuture
-                    .thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING));
-        }
-        return completableFuture;
+                .thenCompose(ignored -> endTxnInTransactionBuffer(txnID, txnAction))
+                .exceptionally(e -> {
+                    if (e.getCause() instanceof TransactionNotFoundException
+                            || e.getCause() instanceof InvalidTxnStatusException) {

Review comment:
       It's better to create a method `isRetryableException()` to clearly list those exceptions that need to be retried. This will avoid stuck in an infinite retry situation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org