You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/19 00:33:28 UTC
[pulsar] branch master updated: [Transaction] Tc recover handle
transaction in committing and aborting status . (#10179)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36c3bc3 [Transaction] Tc recover handle transaction in committing and aborting status . (#10179)
36c3bc3 is described below
commit 36c3bc3178a2e7b5d05b8eb0148242379896c63a
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Apr 19 08:32:48 2021 +0800
[Transaction] Tc recover handle transaction in committing and aborting status . (#10179)
## Motivation
Now recover don't handle transaction in committing or aborting status, it only add to ```transactionTimeOutTracker```.
## implement
Add ```TransactionRecoverTracker``` to handle different status transaction.
```
/**
* Handle recover transaction update status.
* @param sequenceId {@link long} the sequenceId of this transaction.
* @param txnStatus {@link long} the txn status of this operation.
*/
void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException;
/**
* Handle recover transaction in open status.
* @param sequenceId {@link Long} the sequenceId of this transaction.
* @param timeout {@link long} the timeout time of this transaction.
*/
void handleOpenStatusTransaction(long sequenceId, long timeout);
/**
* Handle the transaction in open status append to transaction timeout tracker.
*/
void appendOpenTransactionToTimeoutTracker();
/**
* Handle the transaction in committing and aborting.
*/
void handleCommittingAndAbortingTransaction();
```
### Verifying this change
Add the tests for it
Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes
Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
---
.../broker/TransactionMetadataStoreService.java | 9 +-
.../recover/TransactionRecoverTrackerImpl.java | 131 +++++++++++++++++++++
.../broker/transaction/recover/package-info.java | 22 ++++
.../recover/TransactionRecoverTrackerTest.java | 117 ++++++++++++++++++
.../TransactionMetadataStoreProvider.java | 4 +-
.../coordinator/TransactionRecoverTracker.java | 52 ++++++++
.../InMemTransactionMetadataStoreProvider.java | 5 +-
.../impl/MLTransactionMetadataStore.java | 29 +++--
.../impl/MLTransactionMetadataStoreProvider.java | 6 +-
.../MLTransactionMetadataStoreTest.java | 36 +++++-
.../TransactionMetadataStoreProviderTest.java | 3 +-
11 files changed, 390 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 8b1d14e..d1a39c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
+import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
@@ -47,7 +48,9 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
@@ -137,8 +140,12 @@ public class TransactionMetadataStoreService {
if (e != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
} else {
+ TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
+ TransactionRecoverTracker recoverTracker =
+ new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+ timeoutTracker, tcId.getId());
transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
- timeoutTrackerFactory.newTracker(tcId))
+ timeoutTracker, recoverTracker)
.whenComplete((store, ex) -> {
if (ex != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
new file mode 100644
index 0000000..dc10162
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+
+/**
+ * The transaction recover tracker implementation {@link TransactionRecoverTracker}.
+ */
+@Slf4j
+public class TransactionRecoverTrackerImpl implements TransactionRecoverTracker {
+
+ private final long tcId;
+ private final TransactionMetadataStoreService transactionMetadataStoreService;
+ private final TransactionTimeoutTracker timeoutTracker;
+
+ /**
+ * This is for recover open status transaction. The key is this transaction's sequenceId, the value is this
+ * transaction timeout time.
+ * <p>
+ * When transaction update status to committing or aborting, it will be remove form this.
+ * <p>
+ * When transactionMetadataStore recover complete, the transaction don't update status, it will send all
+ * transaction to transactionTimeoutTracker.
+ *
+ */
+ private final Map<Long, Long> openTransactions;
+
+ /**
+ * Update transaction to committing status.
+ * <p>
+ * When transaction update status to committing, it will be add in.
+ * <p>
+ * When transaction update status to committed status, the transaction will remove from it.
+ * <p>
+ * When transactionMetadataStore recover complete, all transaction in this will endTransaction by commit action.
+ */
+ private final Set<Long> committingTransactions;
+
+ /**
+ * Update transaction to aborting status.
+ * <p>
+ * When transaction update status to aborting, it will be add in.
+ * <p>
+ * When transaction update status to aborted status, the transaction will remove from it.
+ * <p>
+ * When transactionMetadataStore recover complete, all transaction in this will endTransaction by abort action.
+ */
+ private final Set<Long> abortingTransactions;
+
+ public TransactionRecoverTrackerImpl(TransactionMetadataStoreService transactionMetadataStoreService,
+ TransactionTimeoutTracker timeoutTracker, long tcId) {
+ this.tcId = tcId;
+ this.transactionMetadataStoreService = transactionMetadataStoreService;
+ this.openTransactions = new HashMap<>();
+ this.committingTransactions = new HashSet<>();
+ this.abortingTransactions = new HashSet<>();
+ this.timeoutTracker = timeoutTracker;
+ }
+
+ @Override
+ public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws InvalidTxnStatusException {
+ switch (txnStatus) {
+ case COMMITTING:
+ openTransactions.remove(sequenceId);
+ committingTransactions.add(sequenceId);
+ break;
+ case ABORTING:
+ openTransactions.remove(sequenceId);
+ abortingTransactions.add(sequenceId);
+ break;
+ case ABORTED:
+ abortingTransactions.remove(sequenceId);
+ break;
+ case COMMITTED:
+ committingTransactions.remove(sequenceId);
+ break;
+ default:
+ throw new InvalidTxnStatusException("Transaction recover tracker`"
+ + new TxnID(tcId, sequenceId) + "` load replay metadata operation "
+ + "from transaction log with unknown operation");
+ }
+ }
+
+ @Override
+ public void handleOpenStatusTransaction(long sequenceId, long timeout) {
+ openTransactions.put(sequenceId, timeout);
+ }
+
+ @Override
+ public void appendOpenTransactionToTimeoutTracker() {
+ openTransactions.forEach(timeoutTracker::replayAddTransaction);
+ }
+
+ @Override
+ public void handleCommittingAndAbortingTransaction() {
+ committingTransactions.forEach(k ->
+ transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE));
+
+ abortingTransactions.forEach(k ->
+ transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE));
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
new file mode 100644
index 0000000..9b99bb6
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of a transaction recover tracker.
+ */
+package org.apache.pulsar.broker.transaction.recover;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
new file mode 100644
index 0000000..dddb10c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
+import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TransactionRecoverTrackerTest {
+
+ @Test
+ public void openStatusRecoverTrackerTest() throws Exception {
+ TransactionMetadataStoreService transactionMetadataStoreService = mock(TransactionMetadataStoreService.class);
+ TransactionTimeoutTracker timeoutTracker = new TransactionTimeoutTrackerFactoryImpl(
+ transactionMetadataStoreService, new HashedWheelTimer()).newTracker(TransactionCoordinatorID.get(1));
+ TransactionRecoverTrackerImpl recoverTracker =
+ new TransactionRecoverTrackerImpl(transactionMetadataStoreService, timeoutTracker, 1);
+
+ recoverTracker.handleOpenStatusTransaction(1, 200);
+ recoverTracker.handleOpenStatusTransaction(2, 300);
+
+ Field field = TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+ field.setAccessible(true);
+ Map<Long, Long> map = (Map<Long, Long>) field.get(recoverTracker);
+
+ assertEquals(map.size(), 2);
+ assertEquals(map.get(1L).longValue(), 200L);
+ assertEquals(map.get(2L).longValue(), 300L);
+
+ field = TransactionTimeoutTrackerImpl.class.getDeclaredField("priorityQueue");
+ field.setAccessible(true);
+ TripleLongPriorityQueue priorityQueue = (TripleLongPriorityQueue) field.get(timeoutTracker);
+ assertEquals(priorityQueue.size(), 0);
+
+ recoverTracker.appendOpenTransactionToTimeoutTracker();
+ assertEquals(priorityQueue.size(), 2);
+ }
+
+ @Test
+ public void updateStatusRecoverTest() throws Exception {
+ TransactionRecoverTrackerImpl recoverTracker =
+ new TransactionRecoverTrackerImpl(mock(TransactionMetadataStoreService.class),
+ mock(TransactionTimeoutTrackerImpl.class), 1);
+ long committingSequenceId = 1L;
+ long committedSequenceId = 2L;
+ long abortingSequenceId = 3L;
+ long abortedSequenceId = 4L;
+ recoverTracker.handleOpenStatusTransaction(committingSequenceId, 100);
+ recoverTracker.handleOpenStatusTransaction(committedSequenceId, 100);
+ recoverTracker.handleOpenStatusTransaction(abortingSequenceId, 100);
+ recoverTracker.handleOpenStatusTransaction(abortedSequenceId, 100);
+
+ Field field = TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+ field.setAccessible(true);
+ Map<Long, Long> openMap = (Map<Long, Long>) field.get(recoverTracker);
+ assertEquals(4, openMap.size());
+
+ recoverTracker.updateTransactionStatus(committingSequenceId, TxnStatus.COMMITTING);
+ assertEquals(3, openMap.size());
+ recoverTracker.updateTransactionStatus(committedSequenceId, TxnStatus.COMMITTING);
+ assertEquals(2, openMap.size());
+ recoverTracker.updateTransactionStatus(committedSequenceId, TxnStatus.COMMITTED);
+
+ recoverTracker.updateTransactionStatus(abortingSequenceId, TxnStatus.ABORTING);
+ assertEquals(1, openMap.size());
+ recoverTracker.updateTransactionStatus(abortedSequenceId, TxnStatus.ABORTING);
+ assertEquals(0, openMap.size());
+ recoverTracker.updateTransactionStatus(abortedSequenceId, TxnStatus.ABORTED);
+
+ field = TransactionRecoverTrackerImpl.class.getDeclaredField("committingTransactions");
+ field.setAccessible(true);
+ Set<Long> commitSet = (Set<Long>) field.get(recoverTracker);
+
+ assertEquals(commitSet.size(), 1);
+ assertTrue(commitSet.contains(committingSequenceId));
+ assertFalse(commitSet.contains(committedSequenceId));
+
+ field = TransactionRecoverTrackerImpl.class.getDeclaredField("abortingTransactions");
+ field.setAccessible(true);
+ Set<Long> abortSet = (Set<Long>) field.get(recoverTracker);
+
+ assertEquals(1, abortSet.size());
+ assertTrue(abortSet.contains(abortingSequenceId));
+ assertFalse(abortSet.contains(abortedSequenceId));
+ }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index c723cf2..4a39824 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -60,11 +60,13 @@ public interface TransactionMetadataStoreProvider {
* @param managedLedgerFactory {@link ManagedLedgerFactory} the managedLedgerFactory to create managedLedger.
* @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger.
* @param timeoutTracker {@link TransactionTimeoutTracker} the timeoutTracker to handle transaction time out.
+ * @param recoverTracker {@link TransactionRecoverTracker} the recoverTracker to handle transaction recover.
* @return a future represents the result of the operation.
* an instance of {@link TransactionMetadataStore} is returned
* if the operation succeeds.
*/
CompletableFuture<TransactionMetadataStore> openStore(
TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
- ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker);
+ ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker recoverTracker);
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
new file mode 100644
index 0000000..6598625
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+/**
+ * This tracker is for transaction metadata store recover handle the different status transaction.
+ */
+public interface TransactionRecoverTracker {
+
+ /**
+ * Handle recover transaction update status.
+ * @param sequenceId {@link long} the sequenceId of this transaction.
+ * @param txnStatus {@link long} the txn status of this operation.
+ */
+ void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException;
+
+ /**
+ * Handle recover transaction in open status.
+ * @param sequenceId {@link Long} the sequenceId of this transaction.
+ * @param timeout {@link long} the timeout time of this transaction.
+ */
+ void handleOpenStatusTransaction(long sequenceId, long timeout);
+
+ /**
+ * Handle the transaction in open status append to transaction timeout tracker.
+ */
+ void appendOpenTransactionToTimeoutTracker();
+
+ /**
+ * Handle the transaction in committing and aborting status.
+ */
+ void handleCommittingAndAbortingTransaction();
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 152d8fd..4c4c04d 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -24,8 +24,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
-import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
/**
* The provider that offers in-memory implementation of {@link TransactionMetadataStore}.
@@ -36,7 +36,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
- TransactionTimeoutTracker timeoutTracker) {
+ TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker recoverTracker) {
return CompletableFuture.completedFuture(
new InMemTransactionMetadataStore(transactionCoordinatorId));
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f86e566..8d2e220 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
@@ -68,7 +69,8 @@ public class MLTransactionMetadataStore
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
- TransactionTimeoutTracker timeoutTracker) {
+ TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker recoverTracker) {
super(State.None);
this.tcID = tcID;
this.transactionLog = mlTransactionLog;
@@ -82,9 +84,11 @@ public class MLTransactionMetadataStore
@Override
public void replayComplete() {
+ recoverTracker.appendOpenTransactionToTimeoutTracker();
if (!changeToReadyState()) {
log.error("Managed ledger transaction metadata store change state error when replay complete");
} else {
+ recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
}
}
@@ -98,8 +102,9 @@ public class MLTransactionMetadataStore
transactionMetadataEntry.getTxnidLeastBits());
switch (transactionMetadataEntry.getMetadataOp()) {
case NEW:
- if (sequenceId.get() < transactionMetadataEntry.getTxnidLeastBits()) {
- sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
+ long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
+ if (sequenceId.get() < txnSequenceId) {
+ sequenceId.set(txnSequenceId);
}
if (txnMetaMap.containsKey(txnID)) {
txnMetaMap.get(txnID).getRight().add(position);
@@ -108,8 +113,9 @@ public class MLTransactionMetadataStore
positions.add(position);
txnMetaMap.put(txnID, MutablePair.of(new TxnMetaImpl(txnID), positions));
txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
- timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(),
- transactionMetadataEntry.getTimeoutMs());
+ recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+ transactionMetadataEntry.getTimeoutMs()
+ + transactionMetadataEntry.getStartTime());
}
break;
case ADD_PARTITION:
@@ -136,17 +142,17 @@ public class MLTransactionMetadataStore
transactionLog.deletePosition(Collections.singletonList(position));
} else {
TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+ txnMetaMap.get(txnID).getLeft()
+ .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+ transactionMetadataEntry.getExpectedStatus());
+ txnMetaMap.get(txnID).getRight().add(position);
+ recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> {
- TxnMeta txnMeta = txnMetaMap.remove(txnID).getLeft();
+ txnMetaMap.remove(txnID).getLeft();
txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
});
- } else {
- txnMetaMap.get(txnID).getLeft()
- .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
- transactionMetadataEntry.getExpectedStatus());
}
- txnMetaMap.get(txnID).getRight().add(position);
}
break;
default:
@@ -155,6 +161,7 @@ public class MLTransactionMetadataStore
+ "from transaction log with unknown operation");
}
} catch (InvalidTxnStatusException e) {
+ transactionLog.deletePosition(Collections.singletonList(position));
log.error(e.getMessage(), e);
}
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index b8a3055..bdf0d56 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.slf4j.Logger;
@@ -41,13 +42,14 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
- TransactionTimeoutTracker timeoutTracker) {
+ TransactionTimeoutTracker timeoutTracker,
+ TransactionRecoverTracker recoverTracker) {
TransactionMetadataStore transactionMetadataStore;
try {
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorId,
new MLTransactionLogImpl(transactionCoordinatorId,
- managedLedgerFactory, managedLedgerConfig), timeoutTracker);
+ managedLedgerFactory, managedLedgerConfig), timeoutTracker, recoverTracker);
} catch (Exception e) {
log.error("MLTransactionMetadataStore init fail", e);
return FutureUtil.failedFuture(e);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index b885e78..b6f2702 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@@ -56,7 +57,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
int checkReplayRetryCount = 0;
while (true) {
checkReplayRetryCount++;
@@ -120,7 +121,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
managedLedgerConfig);
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -160,7 +161,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
MLTransactionMetadataStore transactionMetadataStoreTest =
new MLTransactionMetadataStore(transactionCoordinatorID,
new MLTransactionLogImpl(transactionCoordinatorID, factory,
- new ManagedLedgerConfig()), new TransactionTimeoutTrackerImpl());
+ new ManagedLedgerConfig()), new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
while (true) {
if (checkReplayRetryCount > 6) {
@@ -222,7 +223,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -282,7 +283,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
@@ -299,7 +300,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
new ManagedLedgerConfig());
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
- new TransactionTimeoutTrackerImpl());
+ new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
}
@@ -326,4 +327,27 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
}
}
+
+ public static class TransactionRecoverTrackerImpl implements TransactionRecoverTracker {
+
+ @Override
+ public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException {
+
+ }
+
+ @Override
+ public void handleOpenStatusTransaction(long sequenceId, long timeout) {
+
+ }
+
+ @Override
+ public void appendOpenTransactionToTimeoutTracker() {
+
+ }
+
+ @Override
+ public void handleCommittingAndAbortingTransaction() {
+
+ }
+ }
}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 349bcba..26ced4c 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,8 @@ public class TransactionMetadataStoreProviderTest {
@BeforeMethod
public void setup() throws Exception {
this.tcId = new TransactionCoordinatorID(1L);
- this.store = this.provider.openStore(tcId, null, null, null).get();
+ this.store = this.provider.openStore(tcId, null, null,
+ null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl()).get();
}
@Test