You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/11 09:03:37 UTC
[pulsar] branch master updated: [Transaction] Fix generate transactionId some comment. (#13234)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4ecb874 [Transaction] Fix generate transactionId some comment. (#13234)
4ecb874 is described below
commit 4ecb874b71e70b23f2f5310317b3c741007ed61b
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Dec 11 17:02:27 2021 +0800
[Transaction] Fix generate transactionId some comment. (#13234)
---
.../intercept/ManagedLedgerInterceptorImpl.java | 4 +-
.../pulsar/broker/transaction/TransactionTest.java | 12 +++---
.../impl/MLTransactionMetadataStore.java | 19 +++++----
.../impl/MLTransactionMetadataStoreProvider.java | 6 +--
....java => MLTransactionSequenceIdGenerator.java} | 15 ++++++--
.../MLTransactionMetadataStoreTest.java | 45 +++++++++++-----------
6 files changed, 52 insertions(+), 49 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
index bbab84b..424797f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java
@@ -35,11 +35,8 @@ import org.slf4j.LoggerFactory;
public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class);
private static final String INDEX = "index";
-
-
private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;
-
public ManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors) {
this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors;
}
@@ -108,6 +105,7 @@ public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor {
entries.close();
promise.complete(null);
} catch (Exception e) {
+ entries.close();
log.error("[{}] Failed to recover the index generator from the last add confirmed entry.",
name, e);
promise.completeExceptionally(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index def6d71..994a5f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -89,7 +89,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
@@ -559,7 +559,7 @@ public class TransactionTest extends TransactionTestBase {
.getTopic(topic, false).get().get();
persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true);
Map<String, String> map = new HashMap<>();
- map.put(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID, "1");
+ map.put(MLTransactionSequenceIdGenerator.MAX_LOCAL_TXN_ID, "1");
persistentTopic.getManagedLedger().setProperties(map);
ManagedCursor managedCursor = mock(ManagedCursor.class);
@@ -570,8 +570,8 @@ public class TransactionTest extends TransactionTestBase {
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog =
new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
persistentTopic.getManagedLedger().getConfig());
@@ -591,7 +591,7 @@ public class TransactionTest extends TransactionTestBase {
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, transactionRecoverTracker,
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -605,7 +605,7 @@ public class TransactionTest extends TransactionTestBase {
MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, transactionRecoverTracker,
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index 6ef4f17..a71d203 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
@@ -60,7 +59,6 @@ public class MLTransactionMetadataStore
private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);
private final TransactionCoordinatorID tcID;
- private final AtomicLong sequenceId;
private final MLTransactionLogImpl transactionLog;
private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
private final TransactionTimeoutTracker timeoutTracker;
@@ -70,14 +68,15 @@ public class MLTransactionMetadataStore
private final LongAdder abortedTransactionCount;
private final LongAdder transactionTimeoutCount;
private final LongAdder appendLogCount;
+ private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker,
- AtomicLong sequenceId) {
+ MLTransactionSequenceIdGenerator sequenceIdGenerator) {
super(State.None);
- this.sequenceId = sequenceId;
+ this.sequenceIdGenerator = sequenceIdGenerator;
this.tcID = tcID;
this.transactionLog = mlTransactionLog;
this.timeoutTracker = timeoutTracker;
@@ -204,7 +203,7 @@ public class MLTransactionMetadataStore
}
long mostSigBits = tcID.getId();
- long leastSigBits = sequenceId.incrementAndGet();
+ long leastSigBits = sequenceIdGenerator.generateSequenceId();
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
long currentTimeMillis = System.currentTimeMillis();
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
@@ -214,7 +213,7 @@ public class MLTransactionMetadataStore
.setTimeoutMs(timeOut)
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
- .setMaxLocalTxnId(sequenceId.get());
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
return transactionLog.append(transactionMetadataEntry)
.thenCompose(position -> {
appendLogCount.increment();
@@ -243,7 +242,7 @@ public class MLTransactionMetadataStore
.setMetadataOp(TransactionMetadataOp.ADD_PARTITION)
.addAllPartitions(partitions)
.setLastModificationTime(System.currentTimeMillis())
- .setMaxLocalTxnId(sequenceId.get());
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
return transactionLog.append(transactionMetadataEntry)
.thenCompose(position -> {
@@ -280,7 +279,7 @@ public class MLTransactionMetadataStore
.setMetadataOp(TransactionMetadataOp.ADD_SUBSCRIPTION)
.addAllSubscriptions(txnSubscriptionToSubscription(txnSubscriptions))
.setLastModificationTime(System.currentTimeMillis())
- .setMaxLocalTxnId(sequenceId.get());
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
return transactionLog.append(transactionMetadataEntry)
.thenCompose(position -> {
@@ -321,7 +320,7 @@ public class MLTransactionMetadataStore
.setMetadataOp(TransactionMetadataOp.UPDATE)
.setLastModificationTime(System.currentTimeMillis())
.setNewStatus(newStatus)
- .setMaxLocalTxnId(sequenceId.get());
+ .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
return transactionLog.append(transactionMetadataEntry).thenCompose(position -> {
appendLogCount.increment();
@@ -378,7 +377,7 @@ public class MLTransactionMetadataStore
TransactionCoordinatorStats transactionCoordinatorstats = new TransactionCoordinatorStats();
transactionCoordinatorstats.setLowWaterMark(getLowWaterMark());
transactionCoordinatorstats.setState(getState().name());
- transactionCoordinatorstats.setLeastSigBits(sequenceId.get());
+ transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
return transactionCoordinatorstats;
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 8018a21..0711f00 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -42,14 +42,14 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(new MLTransactionLogInterceptor());
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
managedLedgerFactory, managedLedgerConfig);
// MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
return txnLog.initialize().thenApply(__ ->
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
- recoverTracker, mlTransactionLogInterceptor.getSequenceId()));
+ recoverTracker, mlTransactionSequenceIdGenerator));
}
}
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
similarity index 93%
rename from pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
rename to pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
index 68add4a..c68997b 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogInterceptor.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java
@@ -19,7 +19,6 @@
package org.apache.pulsar.transaction.coordinator.impl;
import io.netty.buffer.ByteBuf;
-import lombok.Getter;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
@@ -34,12 +33,11 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
*/
-public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
+public class MLTransactionSequenceIdGenerator implements ManagedLedgerInterceptor {
- private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
+ private static final Logger log = LoggerFactory.getLogger(MLTransactionSequenceIdGenerator.class);
private static final long TC_ID_NOT_USED = -1L;
public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";
- @Getter
private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
@Override
@@ -81,6 +79,7 @@ public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
entries.close();
promise.complete(null);
} catch (Exception e) {
+ entries.close();
log.error("[{}] Failed to recover the tc sequenceId from the last add confirmed entry.",
name, e);
promise.completeExceptionally(e);
@@ -101,4 +100,12 @@ public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {
public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
}
+
+ long generateSequenceId() {
+ return sequenceId.incrementAndGet();
+ }
+
+ long getCurrentSequenceId() {
+ return sequenceId.get();
+ }
}
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 03aa1be..7fa3c08 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -31,7 +31,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
-import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogInterceptor;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
@@ -42,7 +42,6 @@ import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -68,15 +67,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
int checkReplayRetryCount = 0;
while (true) {
checkReplayRetryCount++;
@@ -142,8 +141,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
managedLedgerConfig.setMaxEntriesPerLedger(3);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
@@ -151,7 +150,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().until(transactionMetadataStore::checkIfReady);
TxnID txnID = transactionMetadataStore.newTransaction(20000).get();
@@ -177,7 +176,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().until(transactionMetadataStore::checkIfReady);
txnID = transactionMetadataStore.newTransaction(100000).get();
@@ -194,15 +193,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -245,7 +244,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
MLTransactionMetadataStore transactionMetadataStoreTest =
new MLTransactionMetadataStore(transactionCoordinatorID,
txnLog2, new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
while (true) {
if (checkReplayRetryCount > 6) {
@@ -306,15 +305,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
@@ -373,15 +372,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -400,7 +399,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().until(transactionMetadataStore::checkIfReady);
}
@@ -414,15 +413,15 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
- managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionLogInterceptor);
+ MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
+ managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
managedLedgerConfig);
mlTransactionLog.initialize().join();
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl(),
- mlTransactionLogInterceptor.getSequenceId());
+ mlTransactionSequenceIdGenerator);
Awaitility.await().until(transactionMetadataStore::checkIfReady);
transactionMetadataStore.newTransaction(5000).get();