You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/13 13:44:01 UTC
[pulsar] branch master updated: [improve][txn] Avoid create multiple future and exception handler. (#15089)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4aeeed5dab9 [improve][txn] Avoid create multiple future and exception handler. (#15089)
4aeeed5dab9 is described below
commit 4aeeed5dab9dfe9493526f36d539b3ef29cf6fe5
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Apr 13 21:43:47 2022 +0800
[improve][txn] Avoid create multiple future and exception handler. (#15089)
---
.../broker/TransactionMetadataStoreService.java | 228 +++++++++------------
1 file changed, 92 insertions(+), 136 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 4edd35943e6..cd188397989 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -25,12 +25,12 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
@@ -38,6 +38,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
@@ -339,12 +341,13 @@ public class TransactionMetadataStoreService {
}
public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) {
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
- return endTransaction(txnID, txnAction, isTimeout, completableFuture);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ endTransaction(txnID, txnAction, isTimeout, future);
+ return future;
}
- public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
- CompletableFuture<Void> completableFuture) {
+ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,
+ CompletableFuture<Void> future) {
TxnStatus newStatus;
switch (txnAction) {
case TxnAction.COMMIT_VALUE:
@@ -357,90 +360,60 @@ public class TransactionMetadataStoreService {
TransactionCoordinatorException.UnsupportedTxnActionException exception =
new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
LOG.error(exception.getMessage());
- completableFuture.completeExceptionally(exception);
- return completableFuture;
+ future.completeExceptionally(exception);
+ return;
}
-
- getTxnMeta(txnID).thenAccept(txnMeta -> {
- TxnStatus txnStatus = txnMeta.status();
- if (txnStatus == TxnStatus.OPEN) {
- updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout).thenAccept(v ->
- endTxnInTransactionBuffer(txnID, txnAction).thenAccept(a ->
- completableFuture.complete(null)).exceptionally(e -> {
- if (!isRetryableException(e.getCause())) {
- LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, e);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, e);
- }
- transactionOpRetryTimer.newTimeout(timeout ->
- endTransaction(txnID, txnAction, isTimeout, completableFuture),
- endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
- return null;
-
- }
- completableFuture.completeExceptionally(e.getCause());
- return null;
- })).exceptionally(e -> {
- if (!isRetryableException(e.getCause())) {
- LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, e);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, e);
- }
- transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction,
- isTimeout, completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
- return null;
-
+ getTxnMeta(txnID)
+ .thenCompose(txnMeta -> {
+ if (txnMeta.status() == TxnStatus.OPEN) {
+ return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout)
+ .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
+ }
+ return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus)
+ .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction));
+ }).whenComplete((__, ex)-> {
+ if (ex == null) {
+ future.complete(null);
+ return;
+ }
+ Throwable realCause = FutureUtil.unwrapCompletionException(ex);
+ if (!isRetryableException(realCause)) {
+ LOG.error("End transaction fail! TxnId : {}, "
+ + "TxnAction : {}", txnID, txnAction, realCause);
+ future.completeExceptionally(ex);
+ return;
}
- completableFuture.completeExceptionally(e.getCause());
- return null;
- });
- } else {
- if ((txnStatus == COMMITTING && txnAction == TxnAction.COMMIT.getValue())
- || (txnStatus == ABORTING && txnAction == TxnAction.ABORT.getValue())) {
- endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k ->
- completableFuture.complete(null)).exceptionally(e -> {
- if (isRetryableException(e.getCause())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, e);
- }
- transactionOpRetryTimer.newTimeout(timeout ->
- endTransaction(txnID, txnAction, isTimeout, completableFuture),
- endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
- return null;
- } else {
- LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
- + "TxnAction : {}", txnID, txnAction, e);
- }
- completableFuture.completeExceptionally(e.getCause());
- return null;
- });
- } else {
if (LOG.isDebugEnabled()) {
- LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction);
+ LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
+ + "TxnAction : {}", txnID, txnAction, realCause);
}
- completableFuture.completeExceptionally(new InvalidTxnStatusException(txnID, newStatus, txnStatus));
- }
- }
- }).exceptionally(e -> {
- if (isRetryableException(e.getCause())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", txnID, txnAction, e);
- }
- transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout,
- completableFuture), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
- return null;
+ transactionOpRetryTimer.newTimeout(timeout ->
+ endTransaction(txnID, txnAction, isTimeout, future),
+ endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
+ });
+ }
+
+ private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction,
+ TxnID txnID, TxnStatus expectStatus) {
+ boolean isLegal;
+ switch (txnStatus) {
+ case COMMITTING:
+ isLegal = (txnAction == TxnAction.COMMIT.getValue());
+ break;
+ case ABORTING:
+ isLegal = (txnAction == TxnAction.ABORT.getValue());
+ break;
+ default:
+ isLegal = false;
+ }
+ if (!isLegal) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction);
}
- completableFuture.completeExceptionally(e.getCause());
- return null;
- });
- return completableFuture;
+ return FutureUtil.failedFuture(
+ new InvalidTxnStatusException(txnID, expectStatus, txnStatus));
+ }
+ return CompletableFuture.completedFuture(null);
}
// when managedLedger fence will remove this tc and reload
@@ -471,59 +444,42 @@ public class TransactionMetadataStoreService {
}
private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
- CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
- this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
- if (throwable != null) {
- resultFuture.completeExceptionally(throwable);
- return;
- }
- long lowWaterMark = getLowWaterMark(txnID);
-
- txnMeta.ackedPartitions().forEach(tbSub -> {
- CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
- if (TxnAction.COMMIT_VALUE == txnAction) {
- actionFuture = tbClient.commitTxnOnSubscription(
- tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
- txnID.getLeastSigBits(), lowWaterMark);
- } else if (TxnAction.ABORT_VALUE == txnAction) {
- actionFuture = tbClient.abortTxnOnSubscription(
- tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
- txnID.getLeastSigBits(), lowWaterMark);
- } else {
- actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
- }
- completableFutureList.add(actionFuture);
- });
-
- txnMeta.producedPartitions().forEach(partition -> {
- CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
- if (TxnAction.COMMIT_VALUE == txnAction) {
- actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
- txnID.getLeastSigBits(), lowWaterMark);
- } else if (TxnAction.ABORT_VALUE == txnAction) {
- actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
- txnID.getLeastSigBits(), lowWaterMark);
- } else {
- actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
- }
- completableFutureList.add(actionFuture);
- });
-
- try {
- FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
- if (waitThrowable != null) {
- resultFuture.completeExceptionally(waitThrowable);
- return;
- }
- resultFuture.complete(null);
+ return getTxnMeta(txnID)
+ .thenCompose(txnMeta -> {
+ long lowWaterMark = getLowWaterMark(txnID);
+ Stream<CompletableFuture<?>> onSubFutureStream = txnMeta.ackedPartitions().stream().map(tbSub -> {
+ switch (txnAction) {
+ case TxnAction.COMMIT_VALUE:
+ return tbClient.commitTxnOnSubscription(
+ tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
+ txnID.getLeastSigBits(), lowWaterMark);
+ case TxnAction.ABORT_VALUE:
+ return tbClient.abortTxnOnSubscription(
+ tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
+ txnID.getLeastSigBits(), lowWaterMark);
+ default:
+ return FutureUtil.failedFuture(
+ new IllegalStateException("Unsupported txnAction " + txnAction));
+ }
+ });
+ Stream<CompletableFuture<?>> onTopicFutureStream =
+ txnMeta.producedPartitions().stream().map(partition -> {
+ switch (txnAction) {
+ case TxnAction.COMMIT_VALUE:
+ return tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
+ txnID.getLeastSigBits(), lowWaterMark);
+ case TxnAction.ABORT_VALUE:
+ return tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
+ txnID.getLeastSigBits(), lowWaterMark);
+ default:
+ return FutureUtil.failedFuture(
+ new IllegalStateException("Unsupported txnAction " + txnAction));
+ }
+ });
+ return FutureUtil.waitForAll(Stream.concat(onSubFutureStream, onTopicFutureStream)
+ .collect(Collectors.toList()))
+ .thenCompose(__ -> endTxnInTransactionMetadataStore(txnID, txnAction));
});
- } catch (Exception e) {
- resultFuture.completeExceptionally(e);
- }
- });
-
- return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction));
}
private static boolean isRetryableException(Throwable e) {