You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/04/02 09:40:46 UTC
[pulsar] branch branch-2.9 updated: [fix][transaction] Fix potentially unfinished CompletableFuture. (#14973)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 6cd01a2 [fix][transaction] Fix potentially unfinished CompletableFuture. (#14973)
6cd01a2 is described below
commit 6cd01a22ac4ae8fdccc61f1cade6fae63036befd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Apr 2 17:36:44 2022 +0800
[fix][transaction] Fix potentially unfinished CompletableFuture. (#14973)
### Motivation
In MLTransactionMetadataStore#addProducedPartitionToTxn, the method getTxnPositionPair(txnID) may throw TransactionNotFoundException, but it does not catch it. It may cause the uncompleted future.
https://github.com/apache/pulsar/blob/02eb31b372b2bf72350e8f6cbab552e1627e6197/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java#L260-L290
https://github.com/apache/pulsar/blob/02eb31b372b2bf72350e8f6cbab552e1627e6197/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java#L435-L444
(cherry picked from commit 1544375667f8c724ff9098244f47bb07871c8151)
---
.../broker/TransactionMetadataStoreService.java | 3 +-
.../apache/pulsar/broker/service/ServerCnx.java | 15 ++-
.../buffer/impl/TopicTransactionBuffer.java | 2 +-
.../impl/MLTransactionMetadataStore.java | 138 ++++++++++-----------
4 files changed, 77 insertions(+), 81 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 28bef1f..97fae3b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -442,8 +442,7 @@ public class TransactionMetadataStoreService {
// when managedLedger fence will remove this tc and reload
public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
- if (e.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException
- || e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+ if (e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
removeTransactionMetadataStore(tcId);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 89541a4..07d898f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2032,24 +2032,23 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
}
}
private Throwable handleTxnException(Throwable ex, String op, long requestId) {
- if (ex instanceof CoordinatorException.CoordinatorNotFoundException || ex != null
- && ex.getCause() instanceof CoordinatorException.CoordinatorNotFoundException) {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ if (cause instanceof CoordinatorException.CoordinatorNotFoundException) {
if (log.isDebugEnabled()) {
log.debug("The Coordinator was not found for the request {}", op);
}
- return ex;
+ return cause;
}
- if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException || ex != null
- && ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+ if (cause instanceof ManagedLedgerException.ManagedLedgerFencedException) {
if (log.isDebugEnabled()) {
log.debug("Throw a CoordinatorNotFoundException to client "
+ "with the message got from a ManagedLedgerFencedException for the request {}", op);
}
- return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage());
+ return new CoordinatorException.CoordinatorNotFoundException(cause.getMessage());
}
- log.error("Send response error for {} request {}.", op, requestId, ex);
- return ex;
+ log.error("Send response error for {} request {}.", op, requestId, cause);
+ return cause;
}
@Override
protected void handleNewTxn(CommandNewTxn command) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index e2888d9..40b327d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -202,7 +202,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
@Override
public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
- return null;
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 19d651c..f93de8b 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -249,15 +249,15 @@ public class MLTransactionMetadataStore
@Override
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ CompletableFuture<Void> promise = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
- completableFuture
+ promise
.completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "add produced partition"));
return;
}
- getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+ getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(txnID.getMostSigBits())
.setTxnidLeastBits(txnID.getLeastSigBits())
@@ -266,43 +266,42 @@ public class MLTransactionMetadataStore
.setLastModificationTime(System.currentTimeMillis())
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- transactionLog.append(transactionMetadataEntry)
- .whenComplete((position, exception) -> {
- if (exception != null) {
- completableFuture.completeExceptionally(exception);
- return;
- }
+ return transactionLog.append(transactionMetadataEntry)
+ .thenAccept(position -> {
appendLogCount.increment();
try {
synchronized (txnMetaListPair.getLeft()) {
txnMetaListPair.getLeft().addProducedPartitions(partitions);
txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
}
- completableFuture.complete(null);
+ promise.complete(null);
} catch (InvalidTxnStatusException e) {
transactionLog.deletePosition(Collections.singletonList(position));
log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ " add produced partition error with TxnStatus : "
+ txnMetaListPair.getLeft().status().name(), e);
- completableFuture.completeExceptionally(e);
+ promise.completeExceptionally(e);
}
});
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
});
});
- return completableFuture;
+ return promise;
}
@Override
public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
List<TransactionSubscription> txnSubscriptions) {
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ CompletableFuture<Void> promise = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
- completableFuture.completeExceptionally(new CoordinatorException
+ promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
return;
}
- getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+ getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(txnID.getMostSigBits())
.setTxnidLeastBits(txnID.getLeastSigBits())
@@ -311,47 +310,46 @@ public class MLTransactionMetadataStore
.setLastModificationTime(System.currentTimeMillis())
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- transactionLog.append(transactionMetadataEntry)
- .whenComplete((position, exception) -> {
- if (exception != null) {
- completableFuture.completeExceptionally(exception);
- return;
- }
+ return transactionLog.append(transactionMetadataEntry)
+ .thenAccept(position -> {
appendLogCount.increment();
try {
synchronized (txnMetaListPair.getLeft()) {
txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
}
- completableFuture.complete(null);
+ promise.complete(null);
} catch (InvalidTxnStatusException e) {
transactionLog.deletePosition(Collections.singletonList(position));
log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ " add acked subscription error with TxnStatus : "
+ txnMetaListPair.getLeft().status().name(), e);
- completableFuture.completeExceptionally(e);
+ promise.completeExceptionally(e);
}
});
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
});
});
- return completableFuture;
+ return promise;
}
@Override
public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
TxnStatus expectedStatus, boolean isTimeout) {
- CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ CompletableFuture<Void> promise = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
if (!checkIfReady()) {
- completableFuture.completeExceptionally(new CoordinatorException
+ promise.completeExceptionally(new CoordinatorException
.TransactionMetadataStoreStateException(tcID,
State.Ready, getState(), "update transaction status"));
return;
}
- getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+ getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
if (txnMetaListPair.getLeft().status() == newStatus) {
- completableFuture.complete(null);
- return;
+ promise.complete(null);
+ return promise;
}
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
.setTxnidMostBits(txnID.getMostSigBits())
@@ -362,51 +360,51 @@ public class MLTransactionMetadataStore
.setNewStatus(newStatus)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
- transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
- if (throwable != null) {
- completableFuture.completeExceptionally(throwable);
- return;
- }
- appendLogCount.increment();
- try {
- synchronized (txnMetaListPair.getLeft()) {
- txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
- txnMetaListPair.getRight().add(position);
- }
- if (newStatus == TxnStatus.ABORTING && isTimeout) {
- this.transactionTimeoutCount.increment();
- }
- if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
- transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> {
- if (exception != null) {
- completableFuture.completeExceptionally(exception);
- return;
+ return transactionLog.append(transactionMetadataEntry)
+ .thenAccept(position -> {
+ appendLogCount.increment();
+ try {
+ synchronized (txnMetaListPair.getLeft()) {
+ txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+ txnMetaListPair.getRight().add(position);
}
- this.transactionMetadataStoreStats
- .addTransactionExecutionLatencySample(System.currentTimeMillis()
- - txnMetaListPair.getLeft().getOpenTimestamp());
- if (newStatus == TxnStatus.COMMITTED) {
- committedTransactionCount.increment();
- } else {
- abortedTransactionCount.increment();
+ if (newStatus == TxnStatus.ABORTING && isTimeout) {
+ this.transactionTimeoutCount.increment();
}
- txnMetaMap.remove(txnID.getLeastSigBits());
- completableFuture.complete(null);
- });
- return;
- }
- completableFuture.complete(null);
- } catch (InvalidTxnStatusException e) {
- transactionLog.deletePosition(Collections.singletonList(position));
- log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
- + " add update txn status error with TxnStatus : "
- + txnMetaListPair.getLeft().status().name(), e);
- completableFuture.completeExceptionally(e);
- }
- });
+ if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+ transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) -> {
+ if (ex != null) {
+ promise.completeExceptionally(ex);
+ return;
+ }
+ this.transactionMetadataStoreStats
+ .addTransactionExecutionLatencySample(System.currentTimeMillis()
+ - txnMetaListPair.getLeft().getOpenTimestamp());
+ if (newStatus == TxnStatus.COMMITTED) {
+ committedTransactionCount.increment();
+ } else {
+ abortedTransactionCount.increment();
+ }
+ txnMetaMap.remove(txnID.getLeastSigBits());
+ promise.complete(null);
+ });
+ return;
+ }
+ promise.complete(null);
+ } catch (InvalidTxnStatusException e) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+ + " add update txn status error with TxnStatus : "
+ + txnMetaListPair.getLeft().status().name(), e);
+ promise.completeExceptionally(e);
+ }
+ });
+ }).exceptionally(ex -> {
+ promise.completeExceptionally(ex);
+ return null;
});
});
- return completableFuture;
+ return promise;
}
@Override