You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/06/27 03:10:08 UTC

[pulsar] branch master updated: [feature][transaction] Add a configuration to control max active transaction of coordinator (#15157)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf77364293 [feature][transaction] Add a configuration to control max active transaction of coordinator (#15157)
7bf77364293 is described below

commit 7bf77364293701a97fee69665a2c6d04d7c6b4bc
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Jun 27 11:09:59 2022 +0800

    [feature][transaction] Add a configuration to control max active transaction of coordinator (#15157)
    
    detail in  https://github.com/apache/pulsar/issues/15133
    
    - **Status**: Discussion
    - **Author**: Bo Cong
    - **Pull Request**:
    - **Mailing List discussion**:
    - **Release**: 2.11
    
    # Motivation
    
    Currently, the transaction coordinator does not limit the number of active transactions, which may cause the following problems:
    
    - A large number of active transactions will put a lot of pressure on memory
    - The transaction that a single TC can handle is limited, so the active transaction cannot be expanded infinitely
    - End transaction should wait TP or TB recover success, so a lot end request will pending in TP or TB and TC don't kown the state of the TB or TP, it will wast a lot of resource of the machine. If there have a lot of TB or TP request in pending state, it will cause the OOM
    
    ## Implementation
    
    ### Add config
    
    add maxActiveTransactions into broker.conf
    
    ```makefile
    # The max active transactions in one transaction coordinator
    maxActiveTransactionsPerCoordinator=10000
    ```
    
    
    
    ### How to handle the number of active transactions reach the maxActiveTransactions?
    
    
    
    If reach the maxActiveTransactions, return the Exception to client. It has a lot of disadvantages:
    
    1. broker should add a ReachMaxActiveTxnException, if reach the max active txn exception. client need try this exception then do op. every client will handle the ReachMaxActiveTxnException.
    2. client receive this transaction will not stop open txn, because it don't know what time the TC will be recoverd. It will retry now. When the TC can't recover, the client will keep retrying. But this op is not make sense.
    
    ### Design
    
    When this op request reach the maxActiveTransactions, coordinator don't return any response for this request. ignore this request directly. In this way, broker don't need to add any exception for this config.
    
    
    
    #### Let's we can see, how does this way will affect the client?
    
    If broker don't return the reponse for this request, the op of open txn will timeout. and in coordinator client, it has a semaphore to control the op of txn(open, add produce topic, add ack topic, end txn). In the timeout time, the coordinator client only can open the number of semaphore txns. Any other request will be block. So this design slove this two problems:
    
    1. don't need to add a exception
    2. client will not infinite retry
    
    #### Worries
    
    If you are worried that this design will affect the client-side experience, because the open transaction will always time out and other txn op will be blocked. I think your worry is superfluous, At this time, you should consider increasing the performance of the cluster or find the problematic client to repair.
    
    
    
    ### flow chart
    
    ![image](https://user-images.githubusercontent.com/39078850/162964277-6342ae82-1691-48b5-af84-18bb7a422ff1.png)
    
    
    
    ### Compatibility, Deprecation, and Migration Plan
    
    maxActiveTransactions default = 0, if maxActiveTransactions will not block open txn
    
    ### Test Plan
    
    reach maxActiveTransactions client open txn will timeout
    
    ### Rejected Alternatives
    
    If reach the maxActiveTransactions, return the Exception to client. It has a lot of disadvantages:
    
    1. broker should add a ReachMaxActiveTxnException, if reach the max active txn exception. client need try this exception then do op. every client will handle the ReachMaxActiveTxnException.
    2. client receive this transaction will not stop open txn, because it don't know what time the TC will be recoverd. It will retry now. When the TC can't recover, the client will keep retrying. But this op is not make sense.
---
 conf/broker.conf                                   |  3 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  6 ++
 .../broker/TransactionMetadataStoreService.java    |  3 +-
 .../apache/pulsar/broker/service/ServerCnx.java    | 17 +++-
 .../pulsar/broker/transaction/TransactionTest.java |  7 +-
 .../TransactionCoordinatorConfigTest.java          | 98 ++++++++++++++++++++++
 .../TransactionMetadataStoreProvider.java          |  2 +-
 .../exceptions/CoordinatorException.java           | 12 +++
 .../InMemTransactionMetadataStoreProvider.java     |  3 +-
 .../impl/MLTransactionMetadataStore.java           | 85 ++++++++++---------
 .../impl/MLTransactionMetadataStoreProvider.java   |  5 +-
 .../MLTransactionMetadataStoreTest.java            | 18 ++--
 .../TransactionMetadataStoreProviderTest.java      |  2 +-
 site2/docs/reference-configuration.md              |  1 +
 14 files changed, 202 insertions(+), 60 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index eb81ade3c79..91e20059296 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1432,6 +1432,9 @@ transactionBufferSnapshotMinTimeInMillis=5000
 # The max concurrent requests for transaction buffer client, default is 1000
 transactionBufferClientMaxConcurrentRequests=1000
 
+# The max active transactions per transaction coordinator, default value 0 indicates no limit.
+maxActiveTransactionsPerCoordinator=0
+
 # MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex,
 # It stores the position in pendingAckStore as its value and saves a position used to determine
 # whether the previous data can be cleaned up as a key.
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index c8c3a9aa023..47f7bddfab1 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2600,6 +2600,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private long transactionBufferClientOperationTimeoutInMills = 3000L;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            doc = "The max active transactions per transaction coordinator, default value 0 indicates no limit."
+    )
+    private long maxActiveTransactionsPerCoordinator = 0L;
+
     @FieldContext(
             category = CATEGORY_TRANSACTION,
             doc = "MLPendingAckStore maintain a ConcurrentSkipListMap pendingAckLogIndex`,"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
index 8aff792efec..2af680364f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java
@@ -208,7 +208,8 @@ public class TransactionMetadataStoreService {
                                     timeoutTracker, tcId.getId());
                             return transactionMetadataStoreProvider
                                     .openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
-                                            timeoutTracker, recoverTracker);
+                                            timeoutTracker, recoverTracker,
+                                            pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator());
                 });
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 5fc899dc5e9..39b10af6b47 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2221,11 +2221,20 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                     }
                     commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId());
                 } else {
-                    ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);
+                    if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) {
+                        // if new txn throw ReachMaxActiveTxnException, don't return any response to client,
+                        // otherwise client will retry, it will wast o lot of resources
+                        // link https://github.com/apache/pulsar/issues/15133
+                        log.warn("New txn op reach max active transactions! tcId : {}, requestId : {}",
+                                tcId.getId(), requestId, ex);
+                        // do-nothing
+                    } else {
+                        ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);
 
-                    commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
-                            BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
-                    transactionMetadataStoreService.handleOpFail(ex, tcId);
+                        commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
+                                BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
+                        transactionMetadataStoreService.handleOpFail(ex, tcId);
+                    }
                 }
             }));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 8193f03ba19..edc16b57910 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -695,7 +695,7 @@ public class TransactionTest extends TransactionTestBase {
         doNothing().when(timeoutTracker).start();
         MLTransactionMetadataStore metadataStore1 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
         metadataStore1.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -708,7 +708,8 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore2 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
         metadataStore2.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
@@ -721,7 +722,7 @@ public class TransactionTest extends TransactionTestBase {
 
         MLTransactionMetadataStore metadataStore3 =
                 new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
-                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
+                        mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
         metadataStore3.init(transactionRecoverTracker).get();
         Awaitility.await().untilAsserted(() ->
                 assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
new file mode 100644
index 00000000000..b72b28e52ae
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionCoordinatorConfigTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.coordinator;
+
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+import com.google.common.collect.Sets;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TransactionCoordinatorConfigTest extends BrokerTestBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        ServiceConfiguration configuration = getDefaultConf();
+        configuration.setTransactionCoordinatorEnabled(true);
+        configuration.setMaxActiveTransactionsPerCoordinator(2);
+        super.baseSetup(configuration);
+        admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        pulsar.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                .createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testMaxActiveTxn() throws Exception {
+        pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
+                .enableTransaction(true).operationTimeout(3, TimeUnit.SECONDS).build();
+
+        // new two txn will not reach max active txns
+        Transaction commitTxn =
+                pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        Transaction abortTxn =
+                pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        try {
+            // new the third txn will timeout, broker will return any response
+            pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
+        }
+
+        // release active txn
+        commitTxn.commit().get();
+        abortTxn.abort().get();
+
+        // two txn end, can continue new txn
+        pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+
+        // reach max active txns again
+        try {
+            // new the third txn will timeout, broker will return any response
+            pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+            fail();
+        } catch (Exception e) {
+            assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
+        }
+    }
+}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
index b2fd10ea9ba..edcc42ded84 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java
@@ -68,5 +68,5 @@ public interface TransactionMetadataStoreProvider {
     CompletableFuture<TransactionMetadataStore> openStore(
             TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
             ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
-            TransactionRecoverTracker recoverTracker);
+            TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator);
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
index c8e7aea6516..954d891a188 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java
@@ -119,4 +119,16 @@ public abstract class CoordinatorException extends Exception {
 
         }
     }
+
+    /**
+     * Exception is thrown when a operation of new transaction reach the number of max active transactions.
+     */
+    public static class ReachMaxActiveTxnException extends CoordinatorException {
+
+        private static final long serialVersionUID = 0L;
+
+        public ReachMaxActiveTxnException(String message) {
+            super(message);
+        }
+    }
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
index 4c4c04d1f94..8247aef4a88 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java
@@ -37,7 +37,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
                                                                  ManagedLedgerFactory managedLedgerFactory,
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
-                                                                 TransactionRecoverTracker recoverTracker) {
+                                                                 TransactionRecoverTracker recoverTracker,
+                                                                 long maxActiveTransactionsPerCoordinator) {
         return CompletableFuture.completedFuture(
             new InMemTransactionMetadataStore(transactionCoordinatorId));
     }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index e0f7fc70bba..fa0a6b1102e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -76,11 +76,13 @@ public class MLTransactionMetadataStore
     private final LongAdder appendLogCount;
     private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
     private final ExecutorService internalPinnedExecutor;
+    private final long maxActiveTransactionsPerCoordinator;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
                                       MLTransactionLogImpl mlTransactionLog,
                                       TransactionTimeoutTracker timeoutTracker,
-                                      MLTransactionSequenceIdGenerator sequenceIdGenerator) {
+                                      MLTransactionSequenceIdGenerator sequenceIdGenerator,
+                                      long maxActiveTransactionsPerCoordinator) {
         super(State.None);
         this.sequenceIdGenerator = sequenceIdGenerator;
         this.tcID = tcID;
@@ -88,6 +90,7 @@ public class MLTransactionMetadataStore
         this.timeoutTracker = timeoutTracker;
         this.transactionMetadataStoreStats = new TransactionMetadataStoreStats();
 
+        this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator;
         this.createdTransactionCount = new LongAdder();
         this.committedTransactionCount = new LongAdder();
         this.abortedTransactionCount = new LongAdder();
@@ -219,44 +222,50 @@ public class MLTransactionMetadataStore
 
     @Override
     public CompletableFuture<TxnID> newTransaction(long timeOut) {
-        CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
-        internalPinnedExecutor.execute(() -> {
-            if (!checkIfReady()) {
-                completableFuture.completeExceptionally(new CoordinatorException
-                        .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
-                return;
-            }
+        if (this.maxActiveTransactionsPerCoordinator == 0
+                || this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) {
+            CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
+            internalPinnedExecutor.execute(() -> {
+                if (!checkIfReady()) {
+                    completableFuture.completeExceptionally(new CoordinatorException
+                            .TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
+                    return;
+                }
 
-            long mostSigBits = tcID.getId();
-            long leastSigBits = sequenceIdGenerator.generateSequenceId();
-            TxnID txnID = new TxnID(mostSigBits, leastSigBits);
-            long currentTimeMillis = System.currentTimeMillis();
-            TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
-                    .setTxnidMostBits(mostSigBits)
-                    .setTxnidLeastBits(leastSigBits)
-                    .setStartTime(currentTimeMillis)
-                    .setTimeoutMs(timeOut)
-                    .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
-                    .setLastModificationTime(currentTimeMillis)
-                    .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
-            transactionLog.append(transactionMetadataEntry)
-                    .whenComplete((position, throwable) -> {
-                        if (throwable != null) {
-                            completableFuture.completeExceptionally(throwable);
-                        } else {
-                            appendLogCount.increment();
-                            TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
-                            List<Position> positions = new ArrayList<>();
-                            positions.add(position);
-                            Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
-                            txnMetaMap.put(leastSigBits, pair);
-                            this.timeoutTracker.addTransaction(leastSigBits, timeOut);
-                            createdTransactionCount.increment();
-                            completableFuture.complete(txnID);
-                        }
-                    });
-        });
-        return completableFuture;
+                long mostSigBits = tcID.getId();
+                long leastSigBits = sequenceIdGenerator.generateSequenceId();
+                TxnID txnID = new TxnID(mostSigBits, leastSigBits);
+                long currentTimeMillis = System.currentTimeMillis();
+                TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
+                        .setTxnidMostBits(mostSigBits)
+                        .setTxnidLeastBits(leastSigBits)
+                        .setStartTime(currentTimeMillis)
+                        .setTimeoutMs(timeOut)
+                        .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
+                        .setLastModificationTime(currentTimeMillis)
+                        .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                transactionLog.append(transactionMetadataEntry)
+                        .whenComplete((position, throwable) -> {
+                            if (throwable != null) {
+                                completableFuture.completeExceptionally(throwable);
+                            } else {
+                                appendLogCount.increment();
+                                TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
+                                List<Position> positions = new ArrayList<>();
+                                positions.add(position);
+                                Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
+                                txnMetaMap.put(leastSigBits, pair);
+                                this.timeoutTracker.addTransaction(leastSigBits, timeOut);
+                                createdTransactionCount.increment();
+                                completableFuture.complete(txnID);
+                            }
+                        });
+            });
+            return completableFuture;
+        } else {
+            return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op "
+                    + "reach max active txn! tcId : " + getTransactionCoordinatorID().getId()));
+        }
     }
 
     @Override
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
index 20df6439827..fe887aacf9e 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java
@@ -41,7 +41,8 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
                                                                  ManagedLedgerFactory managedLedgerFactory,
                                                                  ManagedLedgerConfig managedLedgerConfig,
                                                                  TransactionTimeoutTracker timeoutTracker,
-                                                                 TransactionRecoverTracker recoverTracker) {
+                                                                 TransactionRecoverTracker recoverTracker,
+                                                                 long maxActiveTransactionsPerCoordinator) {
         MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
         managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
         MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
@@ -50,6 +51,6 @@ public class MLTransactionMetadataStoreProvider implements TransactionMetadataSt
         // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
         return txnLog.initialize().thenCompose(__ ->
                 new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
-                        mlTransactionSequenceIdGenerator).init(recoverTracker));
+                        mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker));
     }
 }
\ No newline at end of file
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index bad29053850..70365b28b37 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -76,7 +76,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
                         new TransactionTimeoutTrackerImpl(),
-                        mlTransactionSequenceIdGenerator);
+                        mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
@@ -151,7 +151,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -181,7 +181,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -207,7 +207,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
@@ -251,7 +251,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
                 MLTransactionMetadataStore transactionMetadataStoreTest =
                         new MLTransactionMetadataStore(transactionCoordinatorID,
-                                txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                                txnLog2, new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
                 transactionMetadataStoreTest.init(new TransactionRecoverTrackerImpl()).get();
 
                 while (true) {
@@ -320,7 +320,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
         int checkReplayRetryCount = 0;
         while (true) {
@@ -387,7 +387,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -405,7 +405,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
@@ -427,7 +427,7 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         mlTransactionLog.initialize().join();
         MLTransactionMetadataStore transactionMetadataStore =
                 new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
-                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator);
+                        new TransactionTimeoutTrackerImpl(), mlTransactionSequenceIdGenerator, 0L);
         transactionMetadataStore.init(new TransactionRecoverTrackerImpl()).get();
 
         Awaitility.await().until(transactionMetadataStore::checkIfReady);
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
index a54caaf16c1..3f80d850b1a 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java
@@ -63,7 +63,7 @@ public class TransactionMetadataStoreProviderTest {
     public void setup() throws Exception {
         this.tcId = new TransactionCoordinatorID(1L);
         this.store = this.provider.openStore(tcId, null, null,
-                null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl()).get();
+                null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), 0L).get();
     }
 
     @Test
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index c5c636fd043..d8e3bf07aa8 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -334,6 +334,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater
 |replicatorPrefix|  Replicator prefix used for replicator producer name and cursor name pulsar.repl||
 |transactionBufferClientOperationTimeoutInMills|The transaction buffer client's operation timeout in milliseconds.|3000|
 |transactionCoordinatorEnabled|Whether to enable transaction coordinator in broker.|true|
+|maxActiveTransactionsPerCoordinator| Max number of active transactions per transaction coordinator.|0|
 |transactionMetadataStoreProviderClassName|The class name of transactionMetadataStoreProvider.|org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider|
 |transactionBufferSnapshotMaxTransactionCount|Transaction buffer takes a snapshot after the number of transaction operations reaches this value.|1000|
 |transactionBufferSnapshotMinTimeInMillis| The interval between two snapshots that the transaction buffer takes (in milliseconds).|5000|