You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/01 04:56:54 UTC

[pulsar] branch master updated: [Transaction] Fix transaction log replay not handle right. (#8723)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 411bf71  [Transaction] Fix transaction log replay not handle right. (#8723)
411bf71 is described below

commit 411bf71006039af661be13c54dda4ce074f97679
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Dec 1 12:56:27 2020 +0800

    [Transaction] Fix transaction log replay not handle right. (#8723)
    
    ## Motivation
    1. Transaction log replay don't handle right, because when replay compare lastConfirm.entryId, it is not right to replay.
    2. Transaction handle proto is async, but recycle the command will happen before send response, we use requestId or others will not right.
    3. We should afferent the managedLedgerConfig when we use MLTransactionLog, we can config the TransactionLog manageLedger.
    4. add the TransactionNotFound exception in proto, it will return when transaction commit or abort and others transaction operation repeat or error handle.
    ## implement
    1. we compare the markerDeletePosition and lastConfirmPosition to replay.
    2. use local variable replace command get method.
    3. afferent the managedLedgerConfig.
---
 conf/broker.conf                                   |  2 +-
 .../apache/pulsar/broker/ServiceConfiguration.java |  2 +-
 .../broker/TransactionMetadataStoreService.java    | 27 +++++---
 .../broker/service/BrokerServiceException.java     |  2 +
 .../apache/pulsar/broker/service/ServerCnx.java    | 73 ++++++++++++----------
 .../TransactionMetadataStoreServiceTest.java       | 17 +++--
 .../TransactionCoordinatorClientTest.java          |  2 +-
 .../client/impl/TransactionEndToEndTest.java       |  8 +--
 .../TransactionCoordinatorClientException.java     |  9 +++
 pulsar-client-cpp/include/pulsar/Result.h          |  1 +
 pulsar-client-cpp/lib/ClientConnection.cc          |  3 +
 pulsar-client-cpp/lib/Result.cc                    |  3 +
 .../client/impl/TransactionMetaStoreHandler.java   |  2 +
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 +
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 .../TransactionMetadataStoreProvider.java          |  8 ++-
 .../InMemTransactionMetadataStoreProvider.java     |  4 +-
 .../coordinator/impl/MLTransactionLogImpl.java     | 27 ++++----
 .../impl/MLTransactionMetadataStoreProvider.java   |  9 ++-
 .../MLTransactionMetadataStoreTest.java            | 14 +++--
 .../TransactionMetadataStoreProviderTest.java      |  2 +-
 21 files changed, 142 insertions(+), 77 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index f22fb3c..97d4ff0 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1175,4 +1175,4 @@ brokerServicePurgeInactiveFrequencyInSeconds=60
 
 # Enable transaction coordinator in broker
 transactionCoordinatorEnabled=false
-transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider
+transactionMetadataStoreProviderClassName=org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f6e3d35..f4a8b75 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1860,7 +1860,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
             doc = "Class name for transaction metadata store provider"
     )
     private String transactionMetadataStoreProviderClassName =
-            "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider";
+            "org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider";
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 3fbf2d9..3531d9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvide
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
+import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,15 +112,23 @@ public class TransactionMetadataStoreService {
     }
 
     public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
-        transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory())
-            .whenComplete((store, ex) -> {
-                if (ex != null) {
-                    LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
-                } else {
-                    stores.put(tcId, store);
-                    LOG.info("Added new transaction meta store {}", tcId);
-                }
-            });
+        pulsarService.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId))
+                .whenComplete((v ,e) -> {
+                    if (e != null) {
+                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
+                    } else {
+                        transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v)
+                                .whenComplete((store, ex) -> {
+                                    if (ex != null) {
+                                        LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
+                                    } else {
+                                        stores.put(tcId, store);
+                                        LOG.info("Added new transaction meta store {}", tcId);
+                                    }
+                                });
+                    }
+        });
     }
 
     public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 4564617..9c6b2d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -219,6 +219,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.NotAllowedError;
         } else if (t instanceof TransactionConflictException) {
             return ServerError.TransactionConflict;
+        } else if (t instanceof CoordinatorException.TransactionNotFoundException) {
+            return ServerError.TransactionNotFound;
         } else {
             if (checkCauseIfUnknown) {
                 return getClientErrorCode(t.getCause(), false);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cb25e3b..b5bbb84 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -34,6 +34,7 @@ import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Promise;
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -1262,18 +1263,21 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     protected void handleAck(CommandAck ack) {
         checkArgument(state == State.Connected);
         CompletableFuture<Consumer> consumerFuture = consumers.get(ack.getConsumerId());
+        final long requestId = ack.getRequestId();
+        final boolean hasRequestId = ack.hasRequestId();
+        final long consumerId = ack.getConsumerId();
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             consumerFuture.getNow(null).messageAcked(ack).thenRun(() -> {
-                        if (ack.hasRequestId()) {
+                        if (hasRequestId) {
                             ctx.writeAndFlush(Commands.newAckResponse(
-                                    ack.getRequestId(), null, null, ack.getConsumerId()));
+                                    requestId, null, null, consumerId));
                         }
                     }).exceptionally(e -> {
-                        if (ack.hasRequestId()) {
-                            ctx.writeAndFlush(Commands.newAckResponse(ack.getRequestId(),
+                        if (hasRequestId) {
+                            ctx.writeAndFlush(Commands.newAckResponse(requestId,
                                     BrokerServiceException.getClientErrorCode(e),
-                                    e.getMessage(), ack.getConsumerId()));
+                                    e.getMessage(), consumerId));
                         }
                         return null;
                     });
@@ -1660,45 +1664,47 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleNewTxn(CommandNewTxn command) {
+        final long requestId = command.getRequestId();
+        final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
         if (log.isDebugEnabled()) {
-            log.debug("Receive new txn request {} to transaction meta store {} from {}.", command.getRequestId(), command.getTcId(), remoteAddress);
+            log.debug("Receive new txn request {} to transaction meta store {} from {}.", requestId, tcId, remoteAddress);
         }
-        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
         service.pulsar().getTransactionMetadataStoreService().newTransaction(tcId, command.getTxnTtlSeconds())
             .whenComplete(((txnID, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response {} for new txn request {}", tcId.getId(),  command.getRequestId());
+                        log.debug("Send response {} for new txn request {}", tcId.getId(), requestId);
                     }
-                    ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
+                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response error for new txn request {}", command.getRequestId(), ex);
+                        log.debug("Send response error for new txn request {}", requestId, ex);
                     }
-                    ctx.writeAndFlush(Commands.newTxnResponse(command.getRequestId(), tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
+                    ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                 }
             }));
     }
 
     @Override
     protected void handleAddPartitionToTxn(PulsarApi.CommandAddPartitionToTxn command) {
-            TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
-            log.debug("Receive add published partition to txn request {} from {} with txnId {}", command.getRequestId(), remoteAddress, txnID);
+            log.debug("Receive add published partition to txn request {} from {} with txnId {}", requestId, remoteAddress, txnID);
         }
         service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList())
             .whenComplete(((v, ex) -> {
                 if (ex == null) {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response success for add published partition to txn request {}",  command.getRequestId());
+                        log.debug("Send response success for add published partition to txn request {}",  requestId);
                     }
-                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(),
+                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 } else {
                     if (log.isDebugEnabled()) {
-                        log.debug("Send response error for add published partition to txn request {}",  command.getRequestId(), ex);
+                        log.debug("Send response error for add published partition to txn request {}",  requestId, ex);
                     }
-                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                    ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
                             BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                 }
             }));
@@ -1717,7 +1723,7 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             txnID.getLeastSigBits(), txnID.getMostSigBits()));
                 }).exceptionally(throwable -> {
                     log.error("Send response error for end txn request.", throwable);
-                    ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
+                    ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
                             BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
                     return null; });
     }
@@ -1725,20 +1731,22 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
     @Override
     protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition command) {
         final long requestId = command.getRequestId();
+        final String topic = command.getTopic();
+        final List<MessageIdData> messageIdDataList = command.getMessageIdList();
         final int txnAction = command.getTxnAction().getNumber();
         TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
 
-        service.getTopics().get(TopicName.get(command.getTopic()).toString()).whenComplete((topic, t) -> {
-            if (!topic.isPresent()) {
+        service.getTopics().get(TopicName.get(topic).toString()).whenComplete((optionalTopic, t) -> {
+            if (!optionalTopic.isPresent()) {
                 ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
-                        command.getRequestId(), ServerError.TopicNotFound,
-                        "Topic " + command.getTopic() + " is not found."));
+                        requestId, ServerError.TopicNotFound,
+                        "Topic " + topic + " is not found."));
                 return;
             }
-            topic.get().endTxn(txnID, txnAction, command.getMessageIdList())
+            optionalTopic.get().endTxn(txnID, txnAction, messageIdDataList)
                 .whenComplete((ignored, throwable) -> {
                     if (throwable != null) {
-                        log.error("Handle endTxnOnPartition {} failed.", command.getTopic(), throwable);
+                        log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
                         ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
                                 requestId, ServerError.UnknownError, throwable.getMessage()));
                         return;
@@ -1758,10 +1766,10 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
         final String subName = command.getSubscription().getSubscription();
         final int txnAction = command.getTxnAction().getNumber();
 
-        service.getTopics().get(TopicName.get(command.getSubscription().getTopic()).toString())
+        service.getTopics().get(TopicName.get(topic).toString())
             .thenAccept(optionalTopic -> {
                 if (!optionalTopic.isPresent()) {
-                    log.error("The topic {} is not exist in broker.", command.getSubscription().getTopic());
+                    log.error("The topic {} is not exist in broker.", topic);
                     ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
                             requestId, txnidLeastBits, txnidMostBits,
                             ServerError.UnknownError,
@@ -1817,10 +1825,11 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
 
     @Override
     protected void handleAddSubscriptionToTxn(PulsarApi.CommandAddSubscriptionToTxn command) {
-        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
+        final long requestId = command.getRequestId();
         if (log.isDebugEnabled()) {
             log.debug("Receive add published partition to txn request {} from {} with txnId {}",
-                    command.getRequestId(), remoteAddress, txnID);
+                    requestId, remoteAddress, txnID);
         }
 
         service.pulsar().getTransactionMetadataStoreService().addAckedPartitionToTxn(txnID,
@@ -1829,17 +1838,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     if (ex == null) {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response success for add published partition to txn request {}",
-                                    command.getRequestId());
+                                    requestId);
                         }
-                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getLeastSigBits(), txnID.getMostSigBits()));
                         log.info("handle add partition to txn finish.");
                     } else {
                         if (log.isDebugEnabled()) {
                             log.debug("Send response error for add published partition to txn request {}",
-                                    command.getRequestId(), ex);
+                                    requestId, ex);
                         }
-                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(command.getRequestId(),
+                        ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
                                 txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
                                 ex.getMessage()));
                     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
index 8b91aea..99fb073 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
@@ -25,6 +25,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import org.apache.pulsar.transaction.coordinator.TxnMeta;
 import org.apache.pulsar.transaction.coordinator.proto.PulsarTransactionMetadata.TxnStatus;
+import org.awaitility.Awaitility;
 import org.junit.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -33,6 +34,7 @@ import org.testng.annotations.Test;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.assertEquals;
 
@@ -58,10 +60,12 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         Assert.assertNotNull(transactionMetadataStoreService);
 
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
 
         transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 0);
     }
 
     @Test
@@ -70,7 +74,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(1));
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(2));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 3);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 3);
         TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 0).get();
         TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 0).get();
@@ -87,7 +92,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
     public void testAddProducedPartitionToTxn() throws ExecutionException, InterruptedException {
         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
         TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         List<String> partitions = new ArrayList<>();
         partitions.add("ptn-0");
@@ -104,7 +110,8 @@ public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
     public void testAddAckedPartitionToTxn() throws ExecutionException, InterruptedException {
         TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
         transactionMetadataStoreService.addTransactionMetadataStore(TransactionCoordinatorID.get(0));
-        Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 1);
+        Awaitility.await().atMost(1000,  TimeUnit.MILLISECONDS).until(() ->
+                transactionMetadataStoreService.getStores().size() == 1);
         TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 0).get();
         List<TransactionSubscription> partitions = new ArrayList<>();
         partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
index c63db84..d37a5dc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorClientTest.java
@@ -94,7 +94,7 @@ public class TransactionCoordinatorClientTest extends TransactionMetaStoreTestBa
         try {
             transactionCoordinatorClient.abort(txnID, Collections.emptyList());
             Assert.fail("Should be fail, because the txn is in committing state, can't abort now.");
-        } catch (TransactionCoordinatorClientException.InvalidTxnStatusException ignore) {
+        } catch (TransactionCoordinatorClientException ignore) {
            // Ok here
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index fc8532d..db3d9fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -54,7 +54,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
-import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -328,8 +328,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 // recommit one transaction should be failed
                 log.info("expected exception for recommit one transaction.");
                 Assert.assertNotNull(reCommitError);
-                Assert.assertTrue(reCommitError.getCause() instanceof
-                        TransactionCoordinatorClientException.InvalidTxnStatusException);
+                Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
             }
         }
     }
@@ -533,8 +532,7 @@ public class TransactionEndToEndTest extends TransactionTestBase {
                 // recommit one transaction should be failed
                 log.info("expected exception for recommit one transaction.");
                 Assert.assertNotNull(reCommitError);
-                Assert.assertTrue(reCommitError.getCause() instanceof
-                        TransactionCoordinatorClientException.InvalidTxnStatusException);
+                Assert.assertTrue(reCommitError.getCause() instanceof TransactionNotFoundException);
             }
 
             message = consumer.receive(1, TimeUnit.SECONDS);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
index d1ad443..0e1f6c7 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TransactionCoordinatorClientException.java
@@ -71,6 +71,15 @@ public class TransactionCoordinatorClientException extends IOException {
     }
 
     /**
+     * Thrown when transaction not found in transaction coordinator.
+     */
+    public static class TransactionNotFoundException extends TransactionCoordinatorClientException {
+        public TransactionNotFoundException(String message) {
+            super(message);
+        }
+    }
+
+    /**
      * Thrown when transaction meta store handler not exists.
      */
     public static class MetaStoreHandlerNotExistsException extends TransactionCoordinatorClientException {
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index 01a2474..1106aae 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -84,6 +84,7 @@ enum Result
     ResultInvalidTxnStatusError,                     /// Invalid txn status error
     ResultNotAllowedError,                           /// Not allowed
     ResultTransactionConflict,                       /// Transaction ack conflict
+    ResultTransactionNotFound,                       /// Transaction not found
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 824d54a..d17a9b6 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -123,6 +123,9 @@ static Result getResult(ServerError serverError) {
 
         case TransactionConflict:
             return ResultTransactionConflict;
+
+        case TransactionNotFound:
+            return ResultTransactionNotFound;
     }
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 5f074a4..89dfe8e 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -150,6 +150,9 @@ const char* strResult(Result result) {
 
         case ResultTransactionConflict:
             return "ResultTransactionConflict";
+
+        case ResultTransactionNotFound:
+            return "ResultTransactionNotFound";
     };
     // NOTE : Do not add default case in the switch above. In future if we get new cases for
     // ServerError and miss them in the switch above we would like to get notified. Adding
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index 2b4fa48..64688f6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -379,6 +379,8 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
                 return new TransactionCoordinatorClientException.CoordinatorNotFoundException(msg);
             case InvalidTxnStatus:
                 return new TransactionCoordinatorClientException.InvalidTxnStatusException(msg);
+            case TransactionNotFound:
+                return new TransactionCoordinatorClientException.TransactionNotFoundException(msg);
             default:
                 return new TransactionCoordinatorClientException(msg);
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index b30c405..9e1e9d3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -84,6 +84,7 @@ public final class PulsarApi {
     InvalidTxnStatus(21, 21),
     NotAllowedError(22, 22),
     TransactionConflict(23, 23),
+    TransactionNotFound(24, 24),
     ;
     
     public static final int UnknownError_VALUE = 0;
@@ -110,6 +111,7 @@ public final class PulsarApi {
     public static final int InvalidTxnStatus_VALUE = 21;
     public static final int NotAllowedError_VALUE = 22;
     public static final int TransactionConflict_VALUE = 23;
+    public static final int TransactionNotFound_VALUE = 24;
     
     
     public final int getNumber() { return value; }
@@ -140,6 +142,7 @@ public final class PulsarApi {
         case 21: return InvalidTxnStatus;
         case 22: return NotAllowedError;
         case 23: return TransactionConflict;
+        case 24: return TransactionNotFound;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index e722708..5c12ca3 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -205,6 +205,7 @@ enum ServerError {
     NotAllowedError = 22; // Not allowed error
 
     TransactionConflict = 23; // Ack with transaction conflict
+    TransactionNotFound = 24; // Transaction not found
 }
 
 enum AuthMethod {
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index 8a3b6d5..a18912f 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.Beta;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 
 /**
@@ -55,10 +56,15 @@ public interface TransactionMetadataStoreProvider {
      * Open the transaction metadata store for transaction coordinator
      * identified by <tt>transactionCoordinatorId</tt>.
      *
+     * @param transactionCoordinatorId {@link TransactionCoordinatorID} the coordinator id.
+     * @param managedLedgerFactory {@link ManagedLedgerFactory} the managedLedgerFactory to create managedLedger.
+     * @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger.
+     *
      * @return a future represents the result of the operation.
      *         an instance of {@link TransactionMetadataStore} is returned
      *         if the operation succeeds.
      */
     CompletableFuture<TransactionMetadataStore> openStore(
-        TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory);
+            TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
+            ManagedLedgerConfig managedLedgerConfig);
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index f237d6b..8174b3a 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -31,7 +32,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
 
     @Override
     public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
-         ManagedLedgerFactory managedLedgerFactory) {
+                                                                 ManagedLedgerFactory managedLedgerFactory,
+                                                                 ManagedLedgerConfig managedLedgerConfig) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
index 3036f63..e60563a 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java
@@ -27,6 +27,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
@@ -51,7 +52,7 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     private final ManagedLedger managedLedger;
 
-    private final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
+    public final static String TRANSACTION_LOG_PREFIX = NamespaceName.SYSTEM_NAMESPACE + "/transaction-log-";
 
     private final ManagedCursor cursor;
 
@@ -62,17 +63,21 @@ public class MLTransactionLogImpl implements TransactionLog {
     //this is for replay
     private final PositionImpl lastConfirmedEntry;
 
+    private PositionImpl currentLoadPosition;
+
     private final long tcId;
 
     private final String topicName;
 
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
-                                ManagedLedgerFactory managedLedgerFactory) throws Exception {
+                                ManagedLedgerFactory managedLedgerFactory,
+                                ManagedLedgerConfig managedLedgerConfig) throws Exception {
         this.topicName = TRANSACTION_LOG_PREFIX + tcID;
         this.tcId = tcID.getId();
-        this.managedLedger = managedLedgerFactory.open(topicName);
+        this.managedLedger = managedLedgerFactory.open(topicName, managedLedgerConfig);
         this.cursor =  managedLedger.openCursor(TRANSACTION_SUBSCRIPTION_NAME,
                 CommandSubscribe.InitialPosition.Earliest);
+        this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition();
         this.entryQueue = new SpscArrayQueue<>(2000);
         this.lastConfirmedEntry = (PositionImpl) managedLedger.getLastConfirmedEntry();
     }
@@ -163,9 +168,8 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     class TransactionLogReplayer {
 
-        private FillEntryQueueCallback fillEntryQueueCallback;
-        private long currentLoadEntryId;
-        private TransactionLogReplayCallback transactionLogReplayCallback;
+        private final FillEntryQueueCallback fillEntryQueueCallback;
+        private final TransactionLogReplayCallback transactionLogReplayCallback;
 
         TransactionLogReplayer(TransactionLogReplayCallback transactionLogReplayCallback) {
             this.fillEntryQueueCallback = new FillEntryQueueCallback();
@@ -173,16 +177,13 @@ public class MLTransactionLogImpl implements TransactionLog {
         }
 
         public void start() {
-            if (((PositionImpl) cursor.getMarkDeletedPosition()).compareTo(lastConfirmedEntry) == 0) {
-                this.transactionLogReplayCallback.replayComplete();
-                return;
-            }
-            while (currentLoadEntryId < lastConfirmedEntry.getEntryId()) {
+
+            while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) {
                 fillEntryQueueCallback.fillQueue();
                 Entry entry = entryQueue.poll();
                 if (entry != null) {
                     ByteBuf buffer = entry.getDataBuffer();
-                    currentLoadEntryId = entry.getEntryId();
+                    currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
                     ByteBufCodedInputStream stream = ByteBufCodedInputStream.get(buffer);
                     TransactionMetadataEntry.Builder transactionMetadataEntryBuilder =
                             TransactionMetadataEntry.newBuilder();
@@ -213,7 +214,7 @@ public class MLTransactionLogImpl implements TransactionLog {
 
     class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {
 
-        private AtomicLong outstandingReadsRequests = new AtomicLong(0);
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
 
         void fillQueue() {
             if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 0f6c1c8..ef63089 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator.impl;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -35,13 +36,15 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
     private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStoreProvider.class);
 
     @Override
-    public CompletableFuture<TransactionMetadataStore>
-    openStore(TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory) {
+    public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
+                                                                 ManagedLedgerFactory managedLedgerFactory,
+                                                                 ManagedLedgerConfig managedLedgerConfig) {
         TransactionMetadataStore transactionMetadataStore;
         try {
             transactionMetadataStore =
                     new MLTransactionMetadataStore(transactionCoordinatorId,
-                            new MLTransactionLogImpl(transactionCoordinatorId, managedLedgerFactory));
+                            new MLTransactionLogImpl(transactionCoordinatorId,
+                                    managedLedgerFactory, managedLedgerConfig));
         } catch (Exception e) {
             log.error("MLTransactionMetadataStore init fail", e);
             return FutureUtil.failedFuture(e);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 84e68c7..aff0d5a 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.transaction.coordinator;
 
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
@@ -48,7 +49,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
         int checkReplayRetryCount = 0;
@@ -108,7 +110,10 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(2);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                managedLedgerConfig);
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
         int checkReplayRetryCount = 0;
@@ -150,7 +155,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
 
-                                new MLTransactionLogImpl(transactionCoordinatorID, factory));
+                                new MLTransactionLogImpl(transactionCoordinatorID, factory, new ManagedLedgerConfig()));
 
                 while (true) {
                     if (checkReplayRetryCount > 6) {
@@ -208,7 +213,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         factoryConf.setMaxCacheSize(0);
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc, factoryConf);
         TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
-        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory);
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
+                new ManagedLedgerConfig());
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog);
         int checkReplayRetryCount = 0;
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index 9a79b37..84f9a47 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -62,7 +62,7 @@ public class TransactionMetadataStoreProviderTest {
     @BeforeMethod
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
-        this.store = this.provider.openStore(tcId, null).get();
+        this.store = this.provider.openStore(tcId, null, null).get();
     }
 
     @Test