You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:15 UTC

[pulsar] 10/29: [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3c0063b4800c322d3ad12bc1a5d174c59b15746f
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Jun 15 14:17:44 2022 +0800

    [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)
    
    [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely.
    ### Motivation
    The MLTransactionMetadataStore constructor and openTransactionMetadataStore method are asynchronous. So there may be situations where the store in the Initializing state was put into stores
    ### Modification
    Pass in the future to wait for MLTransactionMetadataStore initialization to complete
    
    (cherry picked from commit 0fe8ac0d2c1bb909729580e5456b4d57c2a00346)
---
 .../pulsar/broker/transaction/TransactionTest.java |  13 +-
 .../impl/MLTransactionMetadataStore.java           | 174 +++++++++++----------
 .../impl/MLTransactionMetadataStoreProvider.java   |   4 +-
 .../MLTransactionMetadataStoreTest.java            |  38 ++---
 4 files changed, 121 insertions(+), 108 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 0b5cab28540..3f6ea313652 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -691,9 +691,8 @@ public class TransactionTest extends TransactionTestBase {
         doNothing().when(timeoutTracker).start();
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionSequenceIdGenerator);
-
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+        metadataStore1.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
 
@@ -705,8 +704,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionSequenceIdGenerator);
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+        metadataStore2.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
 
@@ -718,8 +717,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore3 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionSequenceIdGenerator);
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+        metadataStore3.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 685d57e664e..6c88d27cc22 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
@@ -79,7 +80,6 @@ public class MLTransactionMetadataStore
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
-                                      TransactionRecoverTracker recoverTracker,
                                       MLTransactionSequenceIdGenerator sequenceIdGenerator) {
         super(State.None);
         this.sequenceIdGenerator = sequenceIdGenerator;
@@ -96,96 +96,108 @@ public class MLTransactionMetadataStore
         DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
                 + tcID.toString() + "thread_factory");
         this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+    }
 
+    public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracker recoverTracker) {
+        CompletableFuture<TransactionMetadataStore> completableFuture = new CompletableFuture<>();
         if (!changeToInitializingState()) {
             log.error("Managed ledger transaction metadata store change state error when init it");
-            return;
-        }
-
-        internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
-            @Override
-            public void replayComplete() {
-                recoverTracker.appendOpenTransactionToTimeoutTracker();
-                if (!changeToReadyState()) {
-                    log.error("Managed ledger transaction metadata store change state error when replay complete");
-                } else {
-                    recoverTracker.handleCommittingAndAbortingTransaction();
-                    timeoutTracker.start();
+            completableFuture
+                    .completeExceptionally(new TransactionCoordinatorClientException
+                    .CoordinatorNotFoundException("transaction metadata store with tcId "
+                            + tcID.toString() + " change state to Initializing error when init it"));
+        } else {
+            internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+                @Override
+                public void replayComplete() {
+                    recoverTracker.appendOpenTransactionToTimeoutTracker();
+                    if (!changeToReadyState()) {
+                        log.error("Managed ledger transaction metadata store change state error when replay complete");
+                        completableFuture
+                                .completeExceptionally(new TransactionCoordinatorClientException
+                                        .CoordinatorNotFoundException("transaction metadata store with tcId "
+                                        + tcID.toString() + " change state to Ready error when init it"));
+
+                    } else {
+                        recoverTracker.handleCommittingAndAbortingTransaction();
+                        timeoutTracker.start();
+                        completableFuture.complete(MLTransactionMetadataStore.this);
+                    }
                 }
-            }
-
-            @Override
-            public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
 
-                try {
+                @Override
+                public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
 
-                    TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
+                    try {
+                        TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
                             transactionMetadataEntry.getTxnidLeastBits());
-                    long transactionId = transactionMetadataEntry.getTxnidLeastBits();
-                    switch (transactionMetadataEntry.getMetadataOp()) {
-                        case NEW:
-                            long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
-                            if (txnMetaMap.containsKey(transactionId)) {
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                            } else {
-                                List<Position> positions = new ArrayList<>();
-                                positions.add(position);
-                                long openTimestamp = transactionMetadataEntry.getStartTime();
-                                long timeoutAt = transactionMetadataEntry.getTimeoutMs();
-                                txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
-                                        openTimestamp, timeoutAt), positions));
-                                recoverTracker.handleOpenStatusTransaction(txnSequenceId,
-                                        timeoutAt + openTimestamp);
-                            }
-                            break;
-                        case ADD_PARTITION:
-                            if (!txnMetaMap.containsKey(transactionId)) {
-                                transactionLog.deletePosition(Collections.singletonList(position));
-                            } else {
-                                txnMetaMap.get(transactionId).getLeft()
-                                        .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                            }
-                            break;
-                        case ADD_SUBSCRIPTION:
-                            if (!txnMetaMap.containsKey(transactionId)) {
-                                transactionLog.deletePosition(Collections.singletonList(position));
-                            } else {
-                                txnMetaMap.get(transactionId).getLeft()
-                                        .addAckedPartitions(subscriptionToTxnSubscription(
-                                                transactionMetadataEntry.getSubscriptionsList()));
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                            }
-                            break;
-                        case UPDATE:
-                            if (!txnMetaMap.containsKey(transactionId)) {
-                                transactionLog.deletePosition(Collections.singletonList(position));
-                            } else {
-                                TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
-                                txnMetaMap.get(transactionId).getLeft()
-                                        .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
-                                                transactionMetadataEntry.getExpectedStatus());
-                                txnMetaMap.get(transactionId).getRight().add(position);
-                                recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
-                                if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
-                                    transactionLog.deletePosition(txnMetaMap
-                                            .get(transactionId).getRight()).thenAccept(v ->
-                                            txnMetaMap.remove(transactionId).getLeft());
+                        long transactionId = transactionMetadataEntry.getTxnidLeastBits();
+                        switch (transactionMetadataEntry.getMetadataOp()) {
+                            case NEW:
+                                long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
+                                if (txnMetaMap.containsKey(transactionId)) {
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                } else {
+                                    List<Position> positions = new ArrayList<>();
+                                    positions.add(position);
+                                    long openTimestamp = transactionMetadataEntry.getStartTime();
+                                    long timeoutAt = transactionMetadataEntry.getTimeoutMs();
+                                    txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
+                                            openTimestamp, timeoutAt), positions));
+                                    recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                            timeoutAt + openTimestamp);
                                 }
-                            }
-                            break;
-                        default:
-                            throw new InvalidTxnStatusException("Transaction `"
-                                    + txnID + "` load replay metadata operation "
-                                    + "from transaction log with unknown operation");
+                                break;
+                            case ADD_PARTITION:
+                                if (!txnMetaMap.containsKey(transactionId)) {
+                                    transactionLog.deletePosition(Collections.singletonList(position));
+                                } else {
+                                    txnMetaMap.get(transactionId).getLeft()
+                                            .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                }
+                                break;
+                            case ADD_SUBSCRIPTION:
+                                if (!txnMetaMap.containsKey(transactionId)) {
+                                    transactionLog.deletePosition(Collections.singletonList(position));
+                                } else {
+                                    txnMetaMap.get(transactionId).getLeft()
+                                            .addAckedPartitions(subscriptionToTxnSubscription(
+                                                    transactionMetadataEntry.getSubscriptionsList()));
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                }
+                                break;
+                            case UPDATE:
+                                if (!txnMetaMap.containsKey(transactionId)) {
+                                    transactionLog.deletePosition(Collections.singletonList(position));
+                                } else {
+                                    TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+                                    txnMetaMap.get(transactionId).getLeft()
+                                            .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+                                                    transactionMetadataEntry.getExpectedStatus());
+                                    txnMetaMap.get(transactionId).getRight().add(position);
+                                    recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
+                                    if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+                                        transactionLog.deletePosition(txnMetaMap
+                                                .get(transactionId).getRight()).thenAccept(v ->
+                                                txnMetaMap.remove(transactionId).getLeft());
+                                    }
+                                }
+                                break;
+                            default:
+                                throw new InvalidTxnStatusException("Transaction `"
+                                        + txnID + "` load replay metadata operation "
+                                        + "from transaction log with unknown operation");
+                        }
+                    } catch (InvalidTxnStatusException  e) {
+                        transactionLog.deletePosition(Collections.singletonList(position));
+                        log.error(e.getMessage(), e);
                     }
-                } catch (InvalidTxnStatusException  e) {
-                    transactionLog.deletePosition(Collections.singletonList(position));
-                    log.error(e.getMessage(), e);
                 }
-            }
-        }));
+            }));
+        }
+        return completableFuture;
     }
 
     @Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 0711f00ac70..20df6439827 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -48,8 +48,8 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                 managedLedgerFactory, managedLedgerConfig);
 
         // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
-        return txnLog.initialize().thenApply(__ ->
+        return txnLog.initialize().thenCompose(__ ->
                 new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
-                        recoverTracker, mlTransactionSequenceIdGenerator));
+                        mlTransactionSequenceIdGenerator).init(recoverTracker));
     }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index a06bf9e6dea..aafe54e6069 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -74,8 +74,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+                        new TransactionTimeoutTrackerImpl(),
                         mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -149,8 +150,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
@@ -178,8 +179,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -201,10 +202,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
+
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -244,10 +246,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                         managedLedgerConfig);
                 txnLog2.initialize().join();
+
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
-                                txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                                mlTransactionSequenceIdGenerator);
+                                txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                transactionMetadataStoreTest.init(new TransactionRecoverTrackerImpl()).get();
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -315,8 +318,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -382,9 +385,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
-
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
 
@@ -401,8 +403,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
@@ -423,8 +425,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+        transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         transactionMetadataStore.newTransaction(5000).get();