You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/30 10:25:40 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #13348: [Transaction] Do not abort when commit a problematic txn, but throw an exception

codelipenghui commented on a change in pull request #13348:
URL: https://github.com/apache/pulsar/pull/13348#discussion_r776656469



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -63,8 +63,14 @@
     private final TransactionCoordinatorClientImpl tcClient;
     private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;
 
-    private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
-    private final ArrayList<CompletableFuture<Void>> ackFutureList;
+    /**
+     *  The number of operations are executing  in this transaction.
+     */
+    private final AtomicLong opsExecutingInTxn = new AtomicLong(0);

Review comment:
       Using the AtomicUpdater to avoid creating many AtomicLong instances.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> completableFuture) {
+        //There have been an exception, it means this transaction should not be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {

Review comment:
       all the completed futures should not increase the `opsExecutingInTxn`

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> completableFuture) {
+        //There have been an exception, it means this transaction should not be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();

Review comment:
       Can achieve the purpose by only using one instance? Looks like,
   
   - A new future was created when the transaction opened.
   - when all the ongoing requests are done, the opsExecutingInTxn change to 0, and then compete the future

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -163,25 +191,25 @@ public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer)
     public CompletableFuture<Void> commit() {
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
-            this.state = State.COMMITTING;
-            allOpComplete().whenComplete((v, e) -> {
-                if (e != null) {
-                    abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
-                } else {
-                    tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
-                            .whenComplete((vx, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof TransactionNotFoundException
-                                            || ex instanceof InvalidTxnStatusException) {
-                                        this.state = State.ERROR;
-                                    }
-                                    commitFuture.completeExceptionally(ex);
-                                } else {
-                                    this.state = State.COMMITTED;
-                                    commitFuture.complete(vx);
+            executedFuture.thenRun(() -> {

Review comment:
       Should check if the `executedFuture` is completed? If false, we should give feedback to the caller?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> completableFuture) {
+        //There have been an exception, it means this transaction should not be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();
+            completableFuture.thenRun(() -> {
+                synchronized (this) {

Review comment:
       Why need `synchronized` here?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -163,25 +191,25 @@ public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer)
     public CompletableFuture<Void> commit() {
         return checkIfOpenOrCommitting().thenCompose((value) -> {
             CompletableFuture<Void> commitFuture = new CompletableFuture<>();
-            this.state = State.COMMITTING;
-            allOpComplete().whenComplete((v, e) -> {
-                if (e != null) {
-                    abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
-                } else {
-                    tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
-                            .whenComplete((vx, ex) -> {
-                                if (ex != null) {
-                                    if (ex instanceof TransactionNotFoundException
-                                            || ex instanceof InvalidTxnStatusException) {
-                                        this.state = State.ERROR;
-                                    }
-                                    commitFuture.completeExceptionally(ex);
-                                } else {
-                                    this.state = State.COMMITTED;
-                                    commitFuture.complete(vx);
+            executedFuture.thenRun(() -> {
+                this.state = State.COMMITTING;
+                tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
+                        .whenComplete((vx, ex) -> {
+                            if (ex != null) {
+                                if (ex instanceof TransactionNotFoundException
+                                        || ex instanceof InvalidTxnStatusException) {
+                                    this.state = State.ERROR;
                                 }
-                            });
-                }
+                                commitFuture.completeExceptionally(ex);
+                            } else {
+                                this.state = State.COMMITTED;
+                                commitFuture.complete(vx);
+                            }
+                        });
+            }).exceptionally(e -> {
+                commitFuture.completeExceptionally(new PulsarClientException
+                        .TransactionCanNotCommitException(this.txnIdMostBits, this.txnIdLeastBits, e));

Review comment:
       ```suggestion
                   commitFuture.completeExceptionally(e.getCause());
   ```

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
##########
@@ -910,6 +910,28 @@ public TransactionConflictException(String msg) {
         }
     }
 
+    public static class TransactionCanNotCommitException extends PulsarClientException {

Review comment:
       Since both commit or abort might get this exception if users did not handle send message futures or ack message futures properly, I think we should change it to `TransactionOngoingRequestsNotCompleteException`?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> completableFuture) {
+        //There have been an exception, it means this transaction should not be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();
+            completableFuture.thenRun(() -> {
+                synchronized (this) {
+                    opsExecutingInTxn.decrementAndGet();
+                    // This is the last operation so far.
+                    if (opsExecutingInTxn.get() == 0) {
+                        executedFuture.complete(null);
+                    }

Review comment:
       ```suggestion
                       // This is the last operation so far.
                       if (opsExecutingInTxn.decrementAndGet() == 0) {
                           executedFuture.complete(null);
                       }
   ```

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -148,8 +152,32 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
         }
     }
 
+    public synchronized <T> void registerInternal(CompletableFuture<T> completableFuture) {
+        //There have been an exception, it means this transaction should not be commit,
+        //so there is no need to record the execution of other operations
+        if (!executedFuture.isCompletedExceptionally()) {
+            opsExecutingInTxn.incrementAndGet();
+            executedFuture = new CompletableFuture<>();
+            completableFuture.thenRun(() -> {
+                synchronized (this) {
+                    opsExecutingInTxn.decrementAndGet();
+                    // This is the last operation so far.
+                    if (opsExecutingInTxn.get() == 0) {
+                        executedFuture.complete(null);
+                    }
+                }
+            }).exceptionally(e -> {
+                //Complete this future exceptionally and there is no need to executed this method again.
+                synchronized (this) {

Review comment:
       Why need `synchronized` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org