You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/04/02 09:40:46 UTC

[pulsar] branch branch-2.9 updated: [fix][transaction] Fix potentially unfinished CompletableFuture. (#14973)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 6cd01a2  [fix][transaction] Fix potentially unfinished CompletableFuture. (#14973)
6cd01a2 is described below

commit 6cd01a22ac4ae8fdccc61f1cade6fae63036befd
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Apr 2 17:36:44 2022 +0800

    [fix][transaction] Fix potentially unfinished CompletableFuture. (#14973)
    
    ### Motivation
    
    In MLTransactionMetadataStore#addProducedPartitionToTxn, the method getTxnPositionPair(txnID) may throw TransactionNotFoundException, but it does not catch it. It may cause the uncompleted future.
    https://github.com/apache/pulsar/blob/02eb31b372b2bf72350e8f6cbab552e1627e6197/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java#L260-L290
    
    https://github.com/apache/pulsar/blob/02eb31b372b2bf72350e8f6cbab552e1627e6197/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java#L435-L444
    (cherry picked from commit 1544375667f8c724ff9098244f47bb07871c8151)
---
 .../broker/TransactionMetadataStoreService.java    |   3 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  15 ++-
 .../buffer/impl/TopicTransactionBuffer.java        |   2 +-
 .../impl/MLTransactionMetadataStore.java           | 138 ++++++++++-----------
 4 files changed, 77 insertions(+), 81 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 28bef1f..97fae3b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -442,8 +442,7 @@ public class TransactionMetadataStoreService {
 
     // when managedLedger fence will remove this tc and reload
     public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
-        if (e.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException
-                || e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+        if (e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
             removeTransactionMetadataStore(tcId);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 89541a4..07d898f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2032,24 +2032,23 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         }
     }
     private Throwable handleTxnException(Throwable ex, String op, long requestId) {
-        if (ex instanceof CoordinatorException.CoordinatorNotFoundException || ex != null
-                && ex.getCause() instanceof CoordinatorException.CoordinatorNotFoundException) {
+        Throwable cause = FutureUtil.unwrapCompletionException(ex);
+        if (cause instanceof CoordinatorException.CoordinatorNotFoundException) {
             if (log.isDebugEnabled()) {
                 log.debug("The Coordinator was not found for the request {}", op);
             }
-            return ex;
+            return cause;
         }
-        if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException || ex != null
-                && ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException) {
+        if (cause instanceof ManagedLedgerException.ManagedLedgerFencedException) {
             if (log.isDebugEnabled()) {
                 log.debug("Throw a CoordinatorNotFoundException to client "
                         + "with the message got from a ManagedLedgerFencedException for the request {}", op);
             }
-            return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage());
+            return new CoordinatorException.CoordinatorNotFoundException(cause.getMessage());
 
         }
-        log.error("Send response error for {} request {}.", op, requestId, ex);
-        return ex;
+        log.error("Send response error for {} request {}.", op, requestId, cause);
+        return cause;
     }
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index e2888d9..40b327d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -202,7 +202,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
 
     @Override
     public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
-        return null;
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 19d651c..f93de8b 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -249,15 +249,15 @@ public class MLTransactionMetadataStore
 
     @Override
     public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnID, List<String> partitions) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        CompletableFuture<Void> promise = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
             if (!checkIfReady()) {
-                completableFuture
+                promise
                         .completeExceptionally(new CoordinatorException.TransactionMetadataStoreStateException(tcID,
                         State.Ready, getState(), "add produced partition"));
                 return;
             }
-            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+            getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
                 TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
                         .setTxnidMostBits(txnID.getMostSigBits())
                         .setTxnidLeastBits(txnID.getLeastSigBits())
@@ -266,43 +266,42 @@ public class MLTransactionMetadataStore
                         .setLastModificationTime(System.currentTimeMillis())
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-                transactionLog.append(transactionMetadataEntry)
-                        .whenComplete((position, exception) -> {
-                            if (exception != null) {
-                                completableFuture.completeExceptionally(exception);
-                                return;
-                            }
+                return transactionLog.append(transactionMetadataEntry)
+                        .thenAccept(position -> {
                             appendLogCount.increment();
                             try {
                                 synchronized (txnMetaListPair.getLeft()) {
                                     txnMetaListPair.getLeft().addProducedPartitions(partitions);
                                     txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
                                 }
-                                completableFuture.complete(null);
+                                promise.complete(null);
                             } catch (InvalidTxnStatusException e) {
                                 transactionLog.deletePosition(Collections.singletonList(position));
                                 log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
                                         + " add produced partition error with TxnStatus : "
                                         + txnMetaListPair.getLeft().status().name(), e);
-                                completableFuture.completeExceptionally(e);
+                                promise.completeExceptionally(e);
                             }
                         });
+            }).exceptionally(ex -> {
+                promise.completeExceptionally(ex);
+                return null;
             });
         });
-        return completableFuture;
+        return promise;
     }
 
     @Override
     public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
                                                           List<TransactionSubscription> txnSubscriptions) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        CompletableFuture<Void> promise = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
             if (!checkIfReady()) {
-                completableFuture.completeExceptionally(new CoordinatorException
+                promise.completeExceptionally(new CoordinatorException
                         .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "add acked partition"));
                 return;
             }
-            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+            getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
                 TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
                         .setTxnidMostBits(txnID.getMostSigBits())
                         .setTxnidLeastBits(txnID.getLeastSigBits())
@@ -311,47 +310,46 @@ public class MLTransactionMetadataStore
                         .setLastModificationTime(System.currentTimeMillis())
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-                transactionLog.append(transactionMetadataEntry)
-                        .whenComplete((position, exception) -> {
-                            if (exception != null) {
-                                completableFuture.completeExceptionally(exception);
-                                return;
-                            }
+                return transactionLog.append(transactionMetadataEntry)
+                        .thenAccept(position -> {
                             appendLogCount.increment();
                             try {
                                 synchronized (txnMetaListPair.getLeft()) {
                                     txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
                                     txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
                                 }
-                                completableFuture.complete(null);
+                                promise.complete(null);
                             } catch (InvalidTxnStatusException e) {
                                 transactionLog.deletePosition(Collections.singletonList(position));
                                 log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
                                         + " add acked subscription error with TxnStatus : "
                                         + txnMetaListPair.getLeft().status().name(), e);
-                                completableFuture.completeExceptionally(e);
+                                promise.completeExceptionally(e);
                             }
                         });
+            }).exceptionally(ex -> {
+                promise.completeExceptionally(ex);
+                return null;
             });
         });
-        return completableFuture;
+        return promise;
     }
 
     @Override
     public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
                                                                 TxnStatus expectedStatus, boolean isTimeout) {
-        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+        CompletableFuture<Void> promise = new CompletableFuture<>();
         internalPinnedExecutor.execute(() -> {
             if (!checkIfReady()) {
-                completableFuture.completeExceptionally(new CoordinatorException
+                promise.completeExceptionally(new CoordinatorException
                         .TransactionMetadataStoreStateException(tcID,
                         State.Ready, getState(), "update transaction status"));
                 return;
             }
-            getTxnPositionPair(txnID).thenAccept(txnMetaListPair -> {
+            getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> {
                 if (txnMetaListPair.getLeft().status() == newStatus) {
-                    completableFuture.complete(null);
-                    return;
+                    promise.complete(null);
+                    return promise;
                 }
                 TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
                         .setTxnidMostBits(txnID.getMostSigBits())
@@ -362,51 +360,51 @@ public class MLTransactionMetadataStore
                         .setNewStatus(newStatus)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
-                transactionLog.append(transactionMetadataEntry).whenComplete((position, throwable) -> {
-                    if (throwable != null) {
-                        completableFuture.completeExceptionally(throwable);
-                        return;
-                    }
-                    appendLogCount.increment();
-                    try {
-                        synchronized (txnMetaListPair.getLeft()) {
-                            txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
-                            txnMetaListPair.getRight().add(position);
-                        }
-                        if (newStatus == TxnStatus.ABORTING && isTimeout) {
-                            this.transactionTimeoutCount.increment();
-                        }
-                        if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                            transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, exception) -> {
-                                if (exception != null) {
-                                    completableFuture.completeExceptionally(exception);
-                                    return;
+                return transactionLog.append(transactionMetadataEntry)
+                        .thenAccept(position -> {
+                            appendLogCount.increment();
+                            try {
+                                synchronized (txnMetaListPair.getLeft()) {
+                                    txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus);
+                                    txnMetaListPair.getRight().add(position);
                                 }
-                                this.transactionMetadataStoreStats
-                                        .addTransactionExecutionLatencySample(System.currentTimeMillis()
-                                                - txnMetaListPair.getLeft().getOpenTimestamp());
-                                if (newStatus == TxnStatus.COMMITTED) {
-                                    committedTransactionCount.increment();
-                                } else {
-                                    abortedTransactionCount.increment();
+                                if (newStatus == TxnStatus.ABORTING && isTimeout) {
+                                    this.transactionTimeoutCount.increment();
                                 }
-                                txnMetaMap.remove(txnID.getLeastSigBits());
-                                completableFuture.complete(null);
-                            });
-                            return;
-                        }
-                        completableFuture.complete(null);
-                    } catch (InvalidTxnStatusException e) {
-                        transactionLog.deletePosition(Collections.singletonList(position));
-                        log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
-                                + " add update txn status error with TxnStatus : "
-                                + txnMetaListPair.getLeft().status().name(), e);
-                        completableFuture.completeExceptionally(e);
-                    }
-                });
+                                if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                                    transactionLog.deletePosition(txnMetaListPair.getRight()).whenComplete((v, ex) -> {
+                                        if (ex != null) {
+                                            promise.completeExceptionally(ex);
+                                            return;
+                                        }
+                                        this.transactionMetadataStoreStats
+                                                .addTransactionExecutionLatencySample(System.currentTimeMillis()
+                                                        - txnMetaListPair.getLeft().getOpenTimestamp());
+                                        if (newStatus == TxnStatus.COMMITTED) {
+                                            committedTransactionCount.increment();
+                                        } else {
+                                            abortedTransactionCount.increment();
+                                        }
+                                        txnMetaMap.remove(txnID.getLeastSigBits());
+                                        promise.complete(null);
+                                    });
+                                    return;
+                                }
+                                promise.complete(null);
+                            } catch (InvalidTxnStatusException e) {
+                                transactionLog.deletePosition(Collections.singletonList(position));
+                                log.error("TxnID : " + txnMetaListPair.getLeft().id().toString()
+                                        + " add update txn status error with TxnStatus : "
+                                        + txnMetaListPair.getLeft().status().name(), e);
+                                promise.completeExceptionally(e);
+                            }
+                        });
+            }).exceptionally(ex -> {
+                promise.completeExceptionally(ex);
+                return null;
             });
         });
-       return completableFuture;
+       return promise;
     }
 
     @Override