You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/13 13:44:01 UTC

[pulsar] branch master updated: [improve][txn] Avoid create multiple future and exception handler. (#15089)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aeeed5dab9 [improve][txn] Avoid create multiple future and exception handler. (#15089)
4aeeed5dab9 is described below

commit 4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 13 21:43:47 2022 +0800

    [improve][txn] Avoid create multiple future and exception handler. (#15089)
---
 .../broker/TransactionMetadataStoreService.java    | 228 +++++++++------------
 1 file changed, 92 insertions(+), 136 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 4edd35943e6..cd188397989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -25,12 +25,12 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ExecutorService;
@@ -38,6 +38,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -339,12 +341,13 @@ public class TransactionMetadataStoreService {
     }
 
     public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-        return endTransaction(txnID, txnAction, isTimeout, completableFuture);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        endTransaction(txnID, txnAction, isTimeout, future);
+        return future;
     }
 
-    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
-                                                  CompletableFuture<Void> completableFuture) {
+    public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
+                                                  CompletableFuture<Void> future) {
         TxnStatus newStatus;
         switch (txnAction) {
             case TxnAction.COMMIT_VALUE:
@@ -357,90 +360,60 @@ public class TransactionMetadataStoreService {
                 TransactionCoordinatorException.UnsupportedTxnActionException exception =
                         new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
                 LOG.error(exception.getMessage());
-                completableFuture.completeExceptionally(exception);
-                return completableFuture;
+                future.completeExceptionally(exception);
+                return;
         }
-
-        getTxnMeta(txnID).thenAccept(txnMeta -> {
-            TxnStatus txnStatus = txnMeta.status();
-            if (txnStatus == TxnStatus.OPEN) {
-                updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout).thenAccept(v ->
-                        endTxnInTransactionBuffer(txnID, txnAction).thenAccept(a ->
-                                completableFuture.complete(null)).exceptionally(e -> {
-                            if (!isRetryableException(e.getCause())) {
-                                LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
-                                        + "TxnAction : {}", txnID, txnAction, e);
-                            } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
-                                            + "TxnAction : {}", txnID, txnAction, e);
-                                }
-                                transactionOpRetryTimer.newTimeout(timeout ->
-                                        endTransaction(txnID, txnAction, isTimeout, completableFuture),
-                                        endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
-                                return null;
-
-                            }
-                            completableFuture.completeExceptionally(e.getCause());
-                            return null;
-                        })).exceptionally(e -> {
-                    if (!isRetryableException(e.getCause())) {
-                        LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, "
-                                + "TxnAction : {}", txnID, txnAction, e);
-                    } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, "
-                                    + "TxnAction : {}", txnID, txnAction, e);
-                        }
-                        transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction,
-                                isTimeout, completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
-                        return null;
-
+        getTxnMeta(txnID)
+                .thenCompose(txnMeta -> {
+                    if (txnMeta.status() == TxnStatus.OPEN) {
+                        return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout)
+                                .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
+                    }
+                    return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus)
+                            .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
+                }).whenComplete((__, ex)-> {
+                    if (ex == null) {
+                        future.complete(null);
+                        return;
+                    }
+                    Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+                    if (!isRetryableException(realCause)) {
+                        LOG.error("End transaction fail! TxnId : {}, "
+                                + "TxnAction : {}", txnID, txnAction, realCause);
+                        future.completeExceptionally(ex);
+                        return;
                     }
-                    completableFuture.completeExceptionally(e.getCause());
-                    return null;
-                });
-            } else {
-                if ((txnStatus == COMMITTING && txnAction == TxnAction.COMMIT.getValue())
-                        || (txnStatus == ABORTING && txnAction == TxnAction.ABORT.getValue())) {
-                    endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k ->
-                            completableFuture.complete(null)).exceptionally(e -> {
-                        if (isRetryableException(e.getCause())) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
-                                        + "TxnAction : {}", txnID, txnAction, e);
-                            }
-                            transactionOpRetryTimer.newTimeout(timeout ->
-                                            endTransaction(txnID, txnAction, isTimeout, completableFuture),
-                                    endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
-                            return null;
-                        } else {
-                            LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
-                                    + "TxnAction : {}", txnID, txnAction, e);
-                        }
-                        completableFuture.completeExceptionally(e.getCause());
-                        return null;
-                    });
-                } else {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction);
+                        LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
+                                + "TxnAction : {}", txnID, txnAction, realCause);
                     }
-                    completableFuture.completeExceptionally(new InvalidTxnStatusException(txnID, newStatus, txnStatus));
-                }
-            }
-        }).exceptionally(e -> {
-            if (isRetryableException(e.getCause())) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", txnID, txnAction, e);
-                }
-                transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout,
-                        completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
-                return null;
+                    transactionOpRetryTimer.newTimeout(timeout ->
+                                    endTransaction(txnID, txnAction, isTimeout, future),
+                            endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
+                });
+    }
+
+    private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction,
+                                                          TxnID txnID, TxnStatus expectStatus) {
+        boolean isLegal;
+        switch (txnStatus) {
+            case COMMITTING:
+                isLegal =  (txnAction == TxnAction.COMMIT.getValue());
+                break;
+            case ABORTING:
+                isLegal =  (txnAction == TxnAction.ABORT.getValue());
+                break;
+            default:
+                isLegal = false;
+        }
+        if (!isLegal) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction);
             }
-            completableFuture.completeExceptionally(e.getCause());
-            return null;
-        });
-        return completableFuture;
+            return FutureUtil.failedFuture(
+                    new InvalidTxnStatusException(txnID, expectStatus, txnStatus));
+        }
+       return CompletableFuture.completedFuture(null);
     }
 
     // when managedLedger fence will remove this tc and reload
@@ -471,59 +444,42 @@ public class TransactionMetadataStoreService {
     }
 
     private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
-        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-        List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
-        this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
-            if (throwable != null) {
-                resultFuture.completeExceptionally(throwable);
-                return;
-            }
-            long lowWaterMark = getLowWaterMark(txnID);
-
-            txnMeta.ackedPartitions().forEach(tbSub -> {
-                CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
-                if (TxnAction.COMMIT_VALUE == txnAction) {
-                    actionFuture = tbClient.commitTxnOnSubscription(
-                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else if (TxnAction.ABORT_VALUE == txnAction) {
-                    actionFuture = tbClient.abortTxnOnSubscription(
-                            tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else {
-                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
-                }
-                completableFutureList.add(actionFuture);
-            });
-
-            txnMeta.producedPartitions().forEach(partition -> {
-                CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
-                if (TxnAction.COMMIT_VALUE == txnAction) {
-                    actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else if (TxnAction.ABORT_VALUE == txnAction) {
-                    actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
-                            txnID.getLeastSigBits(), lowWaterMark);
-                } else {
-                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
-                }
-                completableFutureList.add(actionFuture);
-            });
-
-            try {
-                FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
-                    if (waitThrowable != null) {
-                        resultFuture.completeExceptionally(waitThrowable);
-                        return;
-                    }
-                    resultFuture.complete(null);
+        return getTxnMeta(txnID)
+                .thenCompose(txnMeta -> {
+                    long lowWaterMark = getLowWaterMark(txnID);
+                    Stream<CompletableFuture<?>> onSubFutureStream = txnMeta.ackedPartitions().stream().map(tbSub -> {
+                        switch (txnAction) {
+                            case TxnAction.COMMIT_VALUE:
+                                return tbClient.commitTxnOnSubscription(
+                                        tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
+                                        txnID.getLeastSigBits(), lowWaterMark);
+                            case TxnAction.ABORT_VALUE:
+                                return tbClient.abortTxnOnSubscription(
+                                        tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
+                                        txnID.getLeastSigBits(), lowWaterMark);
+                            default:
+                                return FutureUtil.failedFuture(
+                                        new IllegalStateException("Unsupported txnAction " + txnAction));
+                        }
+                    });
+                    Stream<CompletableFuture<?>> onTopicFutureStream =
+                            txnMeta.producedPartitions().stream().map(partition -> {
+                                switch (txnAction) {
+                                    case TxnAction.COMMIT_VALUE:
+                                        return tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
+                                                txnID.getLeastSigBits(), lowWaterMark);
+                                    case TxnAction.ABORT_VALUE:
+                                        return tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
+                                            txnID.getLeastSigBits(), lowWaterMark);
+                                    default:
+                                        return FutureUtil.failedFuture(
+                                                new IllegalStateException("Unsupported txnAction " + txnAction));
+                        }
+                    });
+                    return FutureUtil.waitForAll(Stream.concat(onSubFutureStream, onTopicFutureStream)
+                                    .collect(Collectors.toList()))
+                            .thenCompose(__ -> endTxnInTransactionMetadataStore(txnID, txnAction));
                 });
-            } catch (Exception e) {
-                resultFuture.completeExceptionally(e);
-            }
-        });
-
-        return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction));
     }
 
     private static boolean isRetryableException(Throwable e) {