You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/06/15 06:17:59 UTC
[pulsar] branch master updated: [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0fe8ac0d2c1 [Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)
0fe8ac0d2c1 is described below
commit 0fe8ac0d2c1bb909729580e5456b4d57c2a00346
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Jun 15 14:17:44 2022 +0800
[Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely. (#13957)
[Transaction] Set TC state is Ready after open MLTransactionMetadataStore completely.
### Motivation
The MLTransactionMetadataStore constructor and openTransactionMetadataStore method are asynchronous. So there may be situations where the store in the Initializing state was put into stores
### Modification
Pass in the future to wait for MLTransactionMetadataStore initialization to complete
---
.../pulsar/broker/transaction/TransactionTest.java | 13 +-
.../impl/MLTransactionMetadataStore.java | 174 +++++++++++----------
.../impl/MLTransactionMetadataStoreProvider.java | 4 +-
.../MLTransactionMetadataStoreTest.java | 38 ++---
4 files changed, 121 insertions(+), 108 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index f01503d125e..700536c9f8b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -692,9 +692,8 @@ public class TransactionTest extends TransactionTestBase {
doNothing().when(timeoutTracker).start();
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
- mlTransactionLog, timeoutTracker, transactionRecoverTracker,
- mlTransactionSequenceIdGenerator);
-
+ mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+ metadataStore1.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -706,8 +705,8 @@ public class TransactionTest extends TransactionTestBase {
MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
- mlTransactionLog, timeoutTracker, transactionRecoverTracker,
- mlTransactionSequenceIdGenerator);
+ mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+ metadataStore2.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
@@ -719,8 +718,8 @@ public class TransactionTest extends TransactionTestBase {
MLTransactionMetadataStore metadataStore3 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
- mlTransactionLog, timeoutTracker, transactionRecoverTracker,
- mlTransactionSequenceIdGenerator);
+ mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+ metadataStore3.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 685d57e664e..6c88d27cc22 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
@@ -79,7 +80,6 @@ public class MLTransactionMetadataStore
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
TransactionTimeoutTracker timeoutTracker,
- TransactionRecoverTracker recoverTracker,
MLTransactionSequenceIdGenerator sequenceIdGenerator) {
super(State.None);
this.sequenceIdGenerator = sequenceIdGenerator;
@@ -96,96 +96,108 @@ public class MLTransactionMetadataStore
DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
+ tcID.toString() + "thread_factory");
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ }
+ public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracker recoverTracker) {
+ CompletableFuture<TransactionMetadataStore> completableFuture = new CompletableFuture<>();
if (!changeToInitializingState()) {
log.error("Managed ledger transaction metadata store change state error when init it");
- return;
- }
-
- internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
- @Override
- public void replayComplete() {
- recoverTracker.appendOpenTransactionToTimeoutTracker();
- if (!changeToReadyState()) {
- log.error("Managed ledger transaction metadata store change state error when replay complete");
- } else {
- recoverTracker.handleCommittingAndAbortingTransaction();
- timeoutTracker.start();
+ completableFuture
+ .completeExceptionally(new TransactionCoordinatorClientException
+ .CoordinatorNotFoundException("transaction metadata store with tcId "
+ + tcID.toString() + " change state to Initializing error when init it"));
+ } else {
+ internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
+
+ @Override
+ public void replayComplete() {
+ recoverTracker.appendOpenTransactionToTimeoutTracker();
+ if (!changeToReadyState()) {
+ log.error("Managed ledger transaction metadata store change state error when replay complete");
+ completableFuture
+ .completeExceptionally(new TransactionCoordinatorClientException
+ .CoordinatorNotFoundException("transaction metadata store with tcId "
+ + tcID.toString() + " change state to Ready error when init it"));
+
+ } else {
+ recoverTracker.handleCommittingAndAbortingTransaction();
+ timeoutTracker.start();
+ completableFuture.complete(MLTransactionMetadataStore.this);
+ }
}
- }
-
- @Override
- public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
- try {
+ @Override
+ public void handleMetadataEntry(Position position, TransactionMetadataEntry transactionMetadataEntry) {
- TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
+ try {
+ TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
transactionMetadataEntry.getTxnidLeastBits());
- long transactionId = transactionMetadataEntry.getTxnidLeastBits();
- switch (transactionMetadataEntry.getMetadataOp()) {
- case NEW:
- long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
- if (txnMetaMap.containsKey(transactionId)) {
- txnMetaMap.get(transactionId).getRight().add(position);
- } else {
- List<Position> positions = new ArrayList<>();
- positions.add(position);
- long openTimestamp = transactionMetadataEntry.getStartTime();
- long timeoutAt = transactionMetadataEntry.getTimeoutMs();
- txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
- openTimestamp, timeoutAt), positions));
- recoverTracker.handleOpenStatusTransaction(txnSequenceId,
- timeoutAt + openTimestamp);
- }
- break;
- case ADD_PARTITION:
- if (!txnMetaMap.containsKey(transactionId)) {
- transactionLog.deletePosition(Collections.singletonList(position));
- } else {
- txnMetaMap.get(transactionId).getLeft()
- .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
- txnMetaMap.get(transactionId).getRight().add(position);
- }
- break;
- case ADD_SUBSCRIPTION:
- if (!txnMetaMap.containsKey(transactionId)) {
- transactionLog.deletePosition(Collections.singletonList(position));
- } else {
- txnMetaMap.get(transactionId).getLeft()
- .addAckedPartitions(subscriptionToTxnSubscription(
- transactionMetadataEntry.getSubscriptionsList()));
- txnMetaMap.get(transactionId).getRight().add(position);
- }
- break;
- case UPDATE:
- if (!txnMetaMap.containsKey(transactionId)) {
- transactionLog.deletePosition(Collections.singletonList(position));
- } else {
- TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
- txnMetaMap.get(transactionId).getLeft()
- .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
- transactionMetadataEntry.getExpectedStatus());
- txnMetaMap.get(transactionId).getRight().add(position);
- recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
- if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
- transactionLog.deletePosition(txnMetaMap
- .get(transactionId).getRight()).thenAccept(v ->
- txnMetaMap.remove(transactionId).getLeft());
+ long transactionId = transactionMetadataEntry.getTxnidLeastBits();
+ switch (transactionMetadataEntry.getMetadataOp()) {
+ case NEW:
+ long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
+ if (txnMetaMap.containsKey(transactionId)) {
+ txnMetaMap.get(transactionId).getRight().add(position);
+ } else {
+ List<Position> positions = new ArrayList<>();
+ positions.add(position);
+ long openTimestamp = transactionMetadataEntry.getStartTime();
+ long timeoutAt = transactionMetadataEntry.getTimeoutMs();
+ txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID,
+ openTimestamp, timeoutAt), positions));
+ recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+ timeoutAt + openTimestamp);
}
- }
- break;
- default:
- throw new InvalidTxnStatusException("Transaction `"
- + txnID + "` load replay metadata operation "
- + "from transaction log with unknown operation");
+ break;
+ case ADD_PARTITION:
+ if (!txnMetaMap.containsKey(transactionId)) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ } else {
+ txnMetaMap.get(transactionId).getLeft()
+ .addProducedPartitions(transactionMetadataEntry.getPartitionsList());
+ txnMetaMap.get(transactionId).getRight().add(position);
+ }
+ break;
+ case ADD_SUBSCRIPTION:
+ if (!txnMetaMap.containsKey(transactionId)) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ } else {
+ txnMetaMap.get(transactionId).getLeft()
+ .addAckedPartitions(subscriptionToTxnSubscription(
+ transactionMetadataEntry.getSubscriptionsList()));
+ txnMetaMap.get(transactionId).getRight().add(position);
+ }
+ break;
+ case UPDATE:
+ if (!txnMetaMap.containsKey(transactionId)) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ } else {
+ TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+ txnMetaMap.get(transactionId).getLeft()
+ .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+ transactionMetadataEntry.getExpectedStatus());
+ txnMetaMap.get(transactionId).getRight().add(position);
+ recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
+ if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
+ transactionLog.deletePosition(txnMetaMap
+ .get(transactionId).getRight()).thenAccept(v ->
+ txnMetaMap.remove(transactionId).getLeft());
+ }
+ }
+ break;
+ default:
+ throw new InvalidTxnStatusException("Transaction `"
+ + txnID + "` load replay metadata operation "
+ + "from transaction log with unknown operation");
+ }
+ } catch (InvalidTxnStatusException e) {
+ transactionLog.deletePosition(Collections.singletonList(position));
+ log.error(e.getMessage(), e);
}
- } catch (InvalidTxnStatusException e) {
- transactionLog.deletePosition(Collections.singletonList(position));
- log.error(e.getMessage(), e);
}
- }
- }));
+ }));
+ }
+ return completableFuture;
}
@Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 0711f00ac70..20df6439827 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -48,8 +48,8 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
managedLedgerFactory, managedLedgerConfig);
// MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
- return txnLog.initialize().thenApply(__ ->
+ return txnLog.initialize().thenCompose(__ ->
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
- recoverTracker, mlTransactionSequenceIdGenerator));
+ mlTransactionSequenceIdGenerator).init(recoverTracker));
}
}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 2aa678059d4..bad29053850 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -75,8 +75,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
+ new TransactionTimeoutTrackerImpl(),
mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
int checkReplayRetryCount = 0;
while (true) {
checkReplayRetryCount++;
@@ -150,8 +151,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
@@ -180,8 +181,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -203,10 +204,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
+
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -246,10 +248,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
MLTransactionLogImpl txnLog2 = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
txnLog2.initialize().join();
+
MLTransactionMetadataStore transactionMetadataStoreTest =
new MLTransactionMetadataStore(transactionCoordinatorID,
- txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStoreTest.init(new TransactionRecoverTrackerImpl()).get();
while (true) {
if (checkReplayRetryCount > 6) {
@@ -317,8 +320,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -384,9 +387,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
-
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -403,8 +405,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
}
@@ -425,8 +427,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionSequenceIdGenerator);
+ new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+ transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
Awaitility.await().until(transactionMetadataStore::checkIfReady);
transactionMetadataStore.newTransaction(5000).get();