You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/04/19 00:33:28 UTC

[pulsar] branch master updated: [Transaction] Tc recover handle transaction in committing and aborting status . (#10179)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36c3bc3  [Transaction] Tc recover handle transaction in committing and aborting status . (#10179)
36c3bc3 is described below

commit 36c3bc3178a2e7b5d05b8eb0148242379896c63a
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Apr 19 08:32:48 2021 +0800

    [Transaction] Tc recover handle transaction in committing and aborting status . (#10179)
    
    ## Motivation
    Now recover don't handle transaction in committing or aborting status, it only add to ```transactionTimeOutTracker```.
    
    ## implement
    Add ```TransactionRecoverTracker``` to handle different status transaction.
    
    ```
        /**
         * Handle recover transaction update status.
         * @param sequenceId {@link long} the sequenceId of this transaction.
         * @param txnStatus {@link long} the txn status of this operation.
         */
        void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException;
    
        /**
         * Handle recover transaction in open status.
         * @param sequenceId {@link Long} the sequenceId of this transaction.
         * @param timeout {@link long} the timeout time of this transaction.
         */
        void handleOpenStatusTransaction(long sequenceId, long timeout);
    
        /**
         * Handle the transaction in open status append to transaction timeout tracker.
         */
        void appendOpenTransactionToTimeoutTracker();
    
        /**
         * Handle the transaction in committing and aborting.
         */
        void handleCommittingAndAbortingTransaction();
    ```
    ### Verifying this change
    Add the tests for it
    
    Does this pull request potentially affect one of the following parts:
    If yes was chosen, please highlight the changes
    
    Dependencies (does it add or upgrade a dependency): (no)
    The public API: (no)
    The schema: (no)
    The default values of configurations: (no)
    The wire protocol: (no)
    The rest endpoints: (no)
    The admin cli options: (no)
    Anything that affects deployment: (no)
---
 .../broker/TransactionMetadataStoreService.java    |   9 +-
 .../recover/TransactionRecoverTrackerImpl.java     | 131 +++++++++++++++++++++
 .../broker/transaction/recover/package-info.java   |  22 ++++
 .../recover/TransactionRecoverTrackerTest.java     | 117 ++++++++++++++++++
 .../TransactionMetadataStoreProvider.java          |   4 +-
 .../coordinator/TransactionRecoverTracker.java     |  52 ++++++++
 .../InMemTransactionMetadataStoreProvider.java     |   5 +-
 .../impl/MLTransactionMetadataStore.java           |  29 +++--
 .../impl/MLTransactionMetadataStoreProvider.java   |   6 +-
 .../MLTransactionMetadataStoreTest.java            |  36 +++++-
 .../TransactionMetadataStoreProviderTest.java      |   3 +-
 11 files changed, 390 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 8b1d14e..d1a39c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
 import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
+import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
 import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
@@ -47,7 +48,9 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
@@ -137,8 +140,12 @@ public class TransactionMetadataStoreService {
                     if (e != null) {
                         LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
                     } else {
+                        TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
+                        TransactionRecoverTracker recoverTracker =
+                                new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
+                                        timeoutTracker, tcId.getId());
                         transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
-                                timeoutTrackerFactory.newTracker(tcId))
+                                timeoutTracker, recoverTracker)
                                 .whenComplete((store, ex) -> {
                                     if (ex != null) {
                                         LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
new file mode 100644
index 0000000..dc10162
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+
+/**
+ * The transaction recover tracker implementation {@link TransactionRecoverTracker}.
+ */
+@Slf4j
+public class TransactionRecoverTrackerImpl implements TransactionRecoverTracker {
+
+    private final long tcId;
+    private final TransactionMetadataStoreService transactionMetadataStoreService;
+    private final TransactionTimeoutTracker timeoutTracker;
+
+    /**
+     * This is for recover open status transaction. The key is this transaction's sequenceId, the value is this
+     * transaction timeout time.
+     * <p>
+     *     When transaction update status to committing or aborting, it will be remove form this.
+     * <p>
+     *     When transactionMetadataStore recover complete, the transaction don't update status, it will send all
+     *     transaction to transactionTimeoutTracker.
+     *
+     */
+    private final Map<Long, Long> openTransactions;
+
+    /**
+     * Update transaction to committing status.
+     * <p>
+     *     When transaction update status to committing, it will be add in.
+     * <p>
+     *     When transaction update status to committed status, the transaction will remove from it.
+     * <p>
+     *     When transactionMetadataStore recover complete, all transaction in this will endTransaction by commit action.
+     */
+    private final Set<Long> committingTransactions;
+
+    /**
+     * Update transaction to aborting status.
+     * <p>
+     *     When transaction update status to aborting, it will be add in.
+     * <p>
+     *     When transaction update status to aborted status, the transaction will remove from it.
+     * <p>
+     *     When transactionMetadataStore recover complete, all transaction in this will endTransaction by abort action.
+     */
+    private final Set<Long> abortingTransactions;
+
+    public TransactionRecoverTrackerImpl(TransactionMetadataStoreService transactionMetadataStoreService,
+                                  TransactionTimeoutTracker timeoutTracker, long tcId) {
+        this.tcId = tcId;
+        this.transactionMetadataStoreService = transactionMetadataStoreService;
+        this.openTransactions = new HashMap<>();
+        this.committingTransactions = new HashSet<>();
+        this.abortingTransactions = new HashSet<>();
+        this.timeoutTracker = timeoutTracker;
+    }
+
+    @Override
+    public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws InvalidTxnStatusException {
+        switch (txnStatus) {
+            case COMMITTING:
+                openTransactions.remove(sequenceId);
+                committingTransactions.add(sequenceId);
+                break;
+            case ABORTING:
+                openTransactions.remove(sequenceId);
+                abortingTransactions.add(sequenceId);
+                break;
+            case ABORTED:
+                abortingTransactions.remove(sequenceId);
+                break;
+            case COMMITTED:
+                committingTransactions.remove(sequenceId);
+                break;
+            default:
+                throw new InvalidTxnStatusException("Transaction recover tracker`"
+                        + new TxnID(tcId, sequenceId) + "` load replay metadata operation "
+                        + "from transaction log with unknown operation");
+        }
+    }
+
+    @Override
+    public void handleOpenStatusTransaction(long sequenceId, long timeout) {
+        openTransactions.put(sequenceId, timeout);
+    }
+
+    @Override
+    public void appendOpenTransactionToTimeoutTracker() {
+        openTransactions.forEach(timeoutTracker::replayAddTransaction);
+    }
+
+    @Override
+    public void handleCommittingAndAbortingTransaction() {
+        committingTransactions.forEach(k ->
+                transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE));
+
+        abortingTransactions.forEach(k ->
+                transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE));
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
new file mode 100644
index 0000000..9b99bb6
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Implementation of a transaction recover tracker.
+ */
+package org.apache.pulsar.broker.transaction.recover;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
new file mode 100644
index 0000000..dddb10c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.recover;
+
+import io.netty.util.HashedWheelTimer;
+import org.apache.pulsar.broker.TransactionMetadataStoreService;
+import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
+import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TransactionRecoverTrackerTest {
+
+    @Test
+    public void openStatusRecoverTrackerTest() throws Exception {
+        TransactionMetadataStoreService transactionMetadataStoreService = mock(TransactionMetadataStoreService.class);
+        TransactionTimeoutTracker timeoutTracker = new TransactionTimeoutTrackerFactoryImpl(
+                transactionMetadataStoreService, new HashedWheelTimer()).newTracker(TransactionCoordinatorID.get(1));
+        TransactionRecoverTrackerImpl recoverTracker =
+                new TransactionRecoverTrackerImpl(transactionMetadataStoreService, timeoutTracker, 1);
+
+        recoverTracker.handleOpenStatusTransaction(1, 200);
+        recoverTracker.handleOpenStatusTransaction(2, 300);
+
+        Field field = TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+        field.setAccessible(true);
+        Map<Long, Long> map = (Map<Long, Long>) field.get(recoverTracker);
+
+        assertEquals(map.size(), 2);
+        assertEquals(map.get(1L).longValue(), 200L);
+        assertEquals(map.get(2L).longValue(), 300L);
+
+        field = TransactionTimeoutTrackerImpl.class.getDeclaredField("priorityQueue");
+        field.setAccessible(true);
+        TripleLongPriorityQueue priorityQueue = (TripleLongPriorityQueue) field.get(timeoutTracker);
+        assertEquals(priorityQueue.size(), 0);
+
+        recoverTracker.appendOpenTransactionToTimeoutTracker();
+        assertEquals(priorityQueue.size(), 2);
+    }
+
+    @Test
+    public void updateStatusRecoverTest() throws Exception {
+        TransactionRecoverTrackerImpl recoverTracker =
+                new TransactionRecoverTrackerImpl(mock(TransactionMetadataStoreService.class),
+                        mock(TransactionTimeoutTrackerImpl.class), 1);
+        long committingSequenceId = 1L;
+        long committedSequenceId = 2L;
+        long abortingSequenceId = 3L;
+        long abortedSequenceId = 4L;
+        recoverTracker.handleOpenStatusTransaction(committingSequenceId, 100);
+        recoverTracker.handleOpenStatusTransaction(committedSequenceId, 100);
+        recoverTracker.handleOpenStatusTransaction(abortingSequenceId, 100);
+        recoverTracker.handleOpenStatusTransaction(abortedSequenceId, 100);
+
+        Field field = TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
+        field.setAccessible(true);
+        Map<Long, Long> openMap = (Map<Long, Long>) field.get(recoverTracker);
+        assertEquals(4, openMap.size());
+
+        recoverTracker.updateTransactionStatus(committingSequenceId, TxnStatus.COMMITTING);
+        assertEquals(3, openMap.size());
+        recoverTracker.updateTransactionStatus(committedSequenceId, TxnStatus.COMMITTING);
+        assertEquals(2, openMap.size());
+        recoverTracker.updateTransactionStatus(committedSequenceId, TxnStatus.COMMITTED);
+
+        recoverTracker.updateTransactionStatus(abortingSequenceId, TxnStatus.ABORTING);
+        assertEquals(1, openMap.size());
+        recoverTracker.updateTransactionStatus(abortedSequenceId, TxnStatus.ABORTING);
+        assertEquals(0, openMap.size());
+        recoverTracker.updateTransactionStatus(abortedSequenceId, TxnStatus.ABORTED);
+
+        field = TransactionRecoverTrackerImpl.class.getDeclaredField("committingTransactions");
+        field.setAccessible(true);
+        Set<Long> commitSet = (Set<Long>) field.get(recoverTracker);
+
+        assertEquals(commitSet.size(), 1);
+        assertTrue(commitSet.contains(committingSequenceId));
+        assertFalse(commitSet.contains(committedSequenceId));
+
+        field = TransactionRecoverTrackerImpl.class.getDeclaredField("abortingTransactions");
+        field.setAccessible(true);
+        Set<Long> abortSet = (Set<Long>) field.get(recoverTracker);
+
+        assertEquals(1, abortSet.size());
+        assertTrue(abortSet.contains(abortingSequenceId));
+        assertFalse(abortSet.contains(abortedSequenceId));
+    }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index c723cf2..4a39824 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -60,11 +60,13 @@ public interface TransactionMetadataStoreProvider {
      * @param managedLedgerFactory {@link ManagedLedgerFactory} the managedLedgerFactory to create managedLedger.
      * @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger.
      * @param timeoutTracker {@link TransactionTimeoutTracker} the timeoutTracker to handle transaction time out.
+     * @param recoverTracker {@link TransactionRecoverTracker} the recoverTracker to handle transaction recover.
      * @return a future represents the result of the operation.
      *         an instance of {@link TransactionMetadataStore} is returned
      *         if the operation succeeds.
      */
     CompletableFuture<TransactionMetadataStore> openStore(
             TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
-            ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker);
+            ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
+            TransactionRecoverTracker recoverTracker);
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
new file mode 100644
index 0000000..6598625
--- /dev/null
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionRecoverTracker.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.coordinator;
+
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
+import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
+
+/**
+ * This tracker is for transaction metadata store recover handle the different status transaction.
+ */
+public interface TransactionRecoverTracker {
+
+    /**
+     * Handle recover transaction update status.
+     * @param sequenceId {@link long} the sequenceId of this transaction.
+     * @param txnStatus {@link long} the txn status of this operation.
+     */
+    void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException;
+
+    /**
+     * Handle recover transaction in open status.
+     * @param sequenceId {@link Long} the sequenceId of this transaction.
+     * @param timeout {@link long} the timeout time of this transaction.
+     */
+    void handleOpenStatusTransaction(long sequenceId, long timeout);
+
+    /**
+     * Handle the transaction in open status append to transaction timeout tracker.
+     */
+    void appendOpenTransactionToTimeoutTracker();
+
+    /**
+     * Handle the transaction in committing and aborting status.
+     */
+    void handleCommittingAndAbortingTransaction();
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 152d8fd..4c4c04d 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -24,8 +24,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
-import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
 
 /**
  * The provider that offers in-memory implementation of {@link TransactionMetadataStore}.
@@ -36,7 +36,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
     public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
                                                                  ManagedLedgerFactory managedLedgerFactory,
                                                                  ManagedLedgerConfig managedLedgerConfig,
-                                                                 TransactionTimeoutTracker timeoutTracker) {
+                                                                 TransactionTimeoutTracker timeoutTracker,
+                                                                 TransactionRecoverTracker recoverTracker) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index f86e566..8d2e220 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
@@ -68,7 +69,8 @@ public class MLTransactionMetadataStore
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
-                                      TransactionTimeoutTracker timeoutTracker) {
+                                      TransactionTimeoutTracker timeoutTracker,
+                                      TransactionRecoverTracker recoverTracker) {
         super(State.None);
         this.tcID = tcID;
         this.transactionLog = mlTransactionLog;
@@ -82,9 +84,11 @@ public class MLTransactionMetadataStore
 
             @Override
             public void replayComplete() {
+                recoverTracker.appendOpenTransactionToTimeoutTracker();
                 if (!changeToReadyState()) {
                     log.error("Managed ledger transaction metadata store change state error when replay complete");
                 } else {
+                    recoverTracker.handleCommittingAndAbortingTransaction();
                     timeoutTracker.start();
                 }
             }
@@ -98,8 +102,9 @@ public class MLTransactionMetadataStore
                             transactionMetadataEntry.getTxnidLeastBits());
                     switch (transactionMetadataEntry.getMetadataOp()) {
                         case NEW:
-                            if (sequenceId.get() < transactionMetadataEntry.getTxnidLeastBits()) {
-                                sequenceId.set(transactionMetadataEntry.getTxnidLeastBits());
+                            long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
+                            if (sequenceId.get() < txnSequenceId) {
+                                sequenceId.set(txnSequenceId);
                             }
                             if (txnMetaMap.containsKey(txnID)) {
                                 txnMetaMap.get(txnID).getRight().add(position);
@@ -108,8 +113,9 @@ public class MLTransactionMetadataStore
                                 positions.add(position);
                                 txnMetaMap.put(txnID, MutablePair.of(new TxnMetaImpl(txnID), positions));
                                 txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
-                                timeoutTracker.replayAddTransaction(transactionMetadataEntry.getTxnidLeastBits(),
-                                        transactionMetadataEntry.getTimeoutMs());
+                                recoverTracker.handleOpenStatusTransaction(txnSequenceId,
+                                        transactionMetadataEntry.getTimeoutMs()
+                                                + transactionMetadataEntry.getStartTime());
                             }
                             break;
                         case ADD_PARTITION:
@@ -136,17 +142,17 @@ public class MLTransactionMetadataStore
                                 transactionLog.deletePosition(Collections.singletonList(position));
                             } else {
                                 TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
+                                txnMetaMap.get(txnID).getLeft()
+                                        .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
+                                                transactionMetadataEntry.getExpectedStatus());
+                                txnMetaMap.get(txnID).getRight().add(position);
+                                recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
                                 if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
                                     transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> {
-                                        TxnMeta txnMeta = txnMetaMap.remove(txnID).getLeft();
+                                        txnMetaMap.remove(txnID).getLeft();
                                         txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
                                     });
-                                } else {
-                                    txnMetaMap.get(txnID).getLeft()
-                                            .updateTxnStatus(transactionMetadataEntry.getNewStatus(),
-                                                    transactionMetadataEntry.getExpectedStatus());
                                 }
-                                txnMetaMap.get(txnID).getRight().add(position);
                             }
                             break;
                         default:
@@ -155,6 +161,7 @@ public class MLTransactionMetadataStore
                                     + "from transaction log with unknown operation");
                     }
                 } catch (InvalidTxnStatusException  e) {
+                    transactionLog.deletePosition(Collections.singletonList(position));
                     log.error(e.getMessage(), e);
                 }
             }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index b8a3055..bdf0d56 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
 import org.slf4j.Logger;
@@ -41,13 +42,14 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
     public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
                                                                  ManagedLedgerFactory managedLedgerFactory,
                                                                  ManagedLedgerConfig managedLedgerConfig,
-                                                                 TransactionTimeoutTracker timeoutTracker) {
+                                                                 TransactionTimeoutTracker timeoutTracker,
+                                                                 TransactionRecoverTracker recoverTracker) {
         TransactionMetadataStore transactionMetadataStore;
         try {
             transactionMetadataStore =
                     new MLTransactionMetadataStore(transactionCoordinatorId,
                             new MLTransactionLogImpl(transactionCoordinatorId,
-                                    managedLedgerFactory, managedLedgerConfig), timeoutTracker);
+                                    managedLedgerFactory, managedLedgerConfig), timeoutTracker, recoverTracker);
         } catch (Exception e) {
             log.error("MLTransactionMetadataStore init fail", e);
             return FutureUtil.failedFuture(e);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index b885e78..b6f2702 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@@ -56,7 +57,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -120,7 +121,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 managedLedgerConfig);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -160,7 +161,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
                                 new MLTransactionLogImpl(transactionCoordinatorID, factory,
-                                        new ManagedLedgerConfig()), new TransactionTimeoutTrackerImpl());
+                                        new ManagedLedgerConfig()), new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -222,7 +223,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -282,7 +283,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
 
 
         Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
@@ -299,7 +300,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 new ManagedLedgerConfig());
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl());
+                        new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());
 
         Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(transactionMetadataStore::checkIfReady);
     }
@@ -326,4 +327,27 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
         }
     }
+
+    public static class TransactionRecoverTrackerImpl implements TransactionRecoverTracker {
+
+        @Override
+        public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException {
+
+        }
+
+        @Override
+        public void handleOpenStatusTransaction(long sequenceId, long timeout) {
+
+        }
+
+        @Override
+        public void appendOpenTransactionToTimeoutTracker() {
+
+        }
+
+        @Override
+        public void handleCommittingAndAbortingTransaction() {
+
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 349bcba..26ced4c 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,8 @@ public class TransactionMetadataStoreProviderTest {
     @BeforeMethod
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
-        this.store = this.provider.openStore(tcId, null, null, null).get();
+        this.store = this.provider.openStore(tcId, null, null,
+                null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl()).get();
     }
 
     @Test