You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 09:03:37 UTC

[pulsar] branch master updated: [Transaction] Fix generate transactionId some comment. (#13234)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ecb874  [Transaction] Fix generate transactionId some comment. (#13234)
4ecb874 is described below

commit 4ecb874b71e70b23f2f5310317b3c741007ed61b
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Dec 11 17:02:27 2021 +0800

    [Transaction] Fix generate transactionId some comment. (#13234)
---
 .../intercept/ManagedLedgerInterceptorImpl.java    |  4 +-
 .../pulsar/broker/transaction/TransactionTest.java | 12 +++---
 .../impl/MLTransactionMetadataStore.java           | 19 +++++----
 .../impl/MLTransactionMetadataStoreProvider.java   |  6 +--
 ....java => MLTransactionSequenceIdGenerator.java} | 15 ++++++--
 .../MLTransactionMetadataStoreTest.java            | 45 +++++++++++-----------
 6 files changed, 52 insertions(+), 49 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index bbab84b..424797f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -35,11 +35,8 @@ import org.slf4j.LoggerFactory;
 public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
     private static final String INDEX = "index";
-
-
     private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
 
-
     public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors) {
         this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
     }
@@ -108,6 +105,7 @@ public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
                             entries.close();
                             promise.complete(null);
                         } catch (Exception e) {
+                            entries.close();
                             log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
                                     name, e);
                             promise.completeExceptionally(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index def6d71..994a5f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -89,7 +89,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
 import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
@@ -559,7 +559,7 @@ public class TransactionTest extends TransactionTestBase {
                 .getTopic(topic, false).get().get();
         persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
         Map<String, String> map = new HashMap<>();
-        map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1");
+        map.put(MLTransactionSequenceIdGenerator.MAX_LOCAL_TXN_ID, "1");
         persistentTopic.getManagedLedger().setProperties(map);
 
         ManagedCursor managedCursor = mock(ManagedCursor.class);
@@ -570,8 +570,8 @@ public class TransactionTest extends TransactionTestBase {
                     null);
             return null;
         }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog =
                 new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
                         persistentTopic.getManagedLedger().getConfig());
@@ -591,7 +591,7 @@ public class TransactionTest extends TransactionTestBase {
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                         mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -605,7 +605,7 @@ public class TransactionTest extends TransactionTestBase {
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
                         mlTransactionLog, timeoutTracker, transactionRecoverTracker,
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6ef4f17..a71d203 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -60,7 +59,6 @@ public class MLTransactionMetadataStore
     private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
 
     private final TransactionCoordinatorID tcID;
-    private final AtomicLong sequenceId;
     private final MLTransactionLogImpl transactionLog;
     private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
     private final TransactionTimeoutTracker timeoutTracker;
@@ -70,14 +68,15 @@ public class MLTransactionMetadataStore
     private final LongAdder abortedTransactionCount;
     private final LongAdder transactionTimeoutCount;
     private final LongAdder appendLogCount;
+    private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
                                       TransactionRecoverTracker recoverTracker,
-                                      AtomicLong sequenceId) {
+                                      MLTransactionSequenceIdGenerator sequenceIdGenerator) {
         super(State.None);
-        this.sequenceId = sequenceId;
+        this.sequenceIdGenerator = sequenceIdGenerator;
         this.tcID = tcID;
         this.transactionLog = mlTransactionLog;
         this.timeoutTracker = timeoutTracker;
@@ -204,7 +203,7 @@ public class MLTransactionMetadataStore
         }
 
         long mostSigBits = tcID.getId();
-        long leastSigBits = sequenceId.incrementAndGet();
+        long leastSigBits = sequenceIdGenerator.generateSequenceId();
         TxnID txnID = new TxnID(mostSigBits, leastSigBits);
         long currentTimeMillis = System.currentTimeMillis();
         TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
@@ -214,7 +213,7 @@ public class MLTransactionMetadataStore
                 .setTimeoutMs(timeOut)
                 .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                 .setLastModificationTime(currentTimeMillis)
-                .setMaxLocalTxnId(sequenceId.get());
+                .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
         return transactionLog.append(transactionMetadataEntry)
                 .thenCompose(position -> {
                     appendLogCount.increment();
@@ -243,7 +242,7 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
                     .addAllPartitions(partitions)
                     .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceId.get());
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
             return transactionLog.append(transactionMetadataEntry)
                     .thenCompose(position -> {
@@ -280,7 +279,7 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
                     .addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
                     .setLastModificationTime(System.currentTimeMillis())
-                    .setMaxLocalTxnId(sequenceId.get());
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
             return transactionLog.append(transactionMetadataEntry)
                     .thenCompose(position -> {
@@ -321,7 +320,7 @@ public class MLTransactionMetadataStore
                     .setMetadataOp(TransactionMetadataOp.UPDATE)
                     .setLastModificationTime(System.currentTimeMillis())
                     .setNewStatus(newStatus)
-                    .setMaxLocalTxnId(sequenceId.get());
+                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
 
             return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
                 appendLogCount.increment();
@@ -378,7 +377,7 @@ public class MLTransactionMetadataStore
         TransactionCoordinatorStats transactionCoordinatorstats = new TransactionCoordinatorStats();
         transactionCoordinatorstats.setLowWaterMark(getLowWaterMark());
         transactionCoordinatorstats.setState(getState().name());
-        transactionCoordinatorstats.setLeastSigBits(sequenceId.get());
+        transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
         return transactionCoordinatorstats;
     }
 
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 8018a21..0711f00 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -42,14 +42,14 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
                                                                  TransactionRecoverTracker recoverTracker) {
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(new MLTransactionLogInterceptor());
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
                 managedLedgerFactory, managedLedgerConfig);
 
         // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
         return txnLog.initialize().thenApply(__ ->
                 new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
-                        recoverTracker, mlTransactionLogInterceptor.getSequenceId()));
+                        recoverTracker, mlTransactionSequenceIdGenerator));
     }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
similarity index 93%
rename from pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
rename to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
index 68add4a..c68997b 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import io.netty.buffer.ByteBuf;
-import lombok.Getter;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.mledger.impl.OpAddEntry;
@@ -34,12 +33,11 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
  */
-public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
+public class MLTransactionSequenceIdGenerator implements ManagedLedgerInterceptor {
 
-    private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+    private static final Logger log = LoggerFactory.getLogger(MLTransactionSequenceIdGenerator.class);
     private static final long TC_ID_NOT_USED = -1L;
     public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
-    @Getter
     private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
 
     @Override
@@ -81,6 +79,7 @@ public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
                             entries.close();
                             promise.complete(null);
                         } catch (Exception e) {
+                            entries.close();
                             log.error("[{}] Failed to recover the tc sequenceId from the last add confirmed entry.",
                                     name, e);
                             promise.completeExceptionally(e);
@@ -101,4 +100,12 @@ public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
     public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
         propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
     }
+
+    long generateSequenceId() {
+        return sequenceId.incrementAndGet();
+    }
+
+    long getCurrentSequenceId() {
+        return sequenceId.get();
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 03aa1be..7fa3c08 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
 import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
@@ -42,7 +42,6 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -68,15 +67,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         int checkReplayRetryCount = 0;
         while (true) {
             checkReplayRetryCount++;
@@ -142,8 +141,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         managedLedgerConfig.setMaxEntriesPerLedger(3);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
@@ -151,7 +150,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
@@ -177,7 +176,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -194,15 +193,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -245,7 +244,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
                                 txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                                mlTransactionLogInterceptor.getSequenceId());
+                                mlTransactionSequenceIdGenerator);
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -306,15 +305,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
         int checkReplayRetryCount = 0;
         while (true) {
             if (checkReplayRetryCount > 3) {
@@ -373,15 +372,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -400,7 +399,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
     }
@@ -414,15 +413,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-        MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
-        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+        MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+        managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
                 managedLedgerConfig);
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
-                        mlTransactionLogInterceptor.getSequenceId());
+                        mlTransactionSequenceIdGenerator);
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
         transactionMetadataStore.newTransaction(5000).get();