You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/10/24 07:06:58 UTC

[pulsar] branch master updated: [feat][broker] Segmented transaction buffer snapshot segment and index system topic (#16931)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d21176609f9 [feat][broker] Segmented transaction buffer snapshot segment and index system topic (#16931)
d21176609f9 is described below

commit d21176609f9cfe3e51ea5cff69296667a902fd3c
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Oct 24 15:06:51 2022 +0800

    [feat][broker] Segmented transaction buffer snapshot segment and index system topic (#16931)
    
    Master Issue: https://github.com/apache/pulsar/issues/16913
    ### Motivation
    Implement system topic client for snapshot segment topic and index topic to send segment snapshots or indexes.
    The configuration `transactionBufferSegmentedSnapshotEnabled` is used in the Transaction Buffer to determine which `AbortedTxnProcessor` is adopted by this TB.
    ### Modification
    
    In the new implementation of the Transaction Buffer Snapshot System topic, because the system topic that needs to be processed has changed from the original one to three with different schemes, we have added generics to the TransactionBufferSnapshotBaseSystemTopicClient class and the SystemTopicTxnBufferSnapshotService<T> class.
    And Pulsar Service maintains a factory class TransactionBufferSnapshotServiceFactory used to obtain SystemTopicTxnBufferSnapshotService.
    This way, we can obtain the required System topic client through pulsarService to read and send snapshots.
    <img width="1336" alt="image" src="https://user-images.githubusercontent.com/55571188/197467173-9028e58a-79cc-4fe4-81e2-c299c568caee.png">
---
 .../apache/bookkeeper/mledger/AsyncCallbacks.java  |   7 +
 .../bookkeeper/mledger/ManagedLedgerFactory.java   |  11 ++
 .../mledger/impl/ManagedLedgerFactoryImpl.java     |  52 +++--
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java    |  14 +-
 .../org/apache/pulsar/broker/PulsarService.java    |  15 +-
 .../SystemTopicBasedTopicPoliciesService.java      |  20 +-
 ...va => SystemTopicTxnBufferSnapshotService.java} |  63 +++---
 .../service/TransactionBufferSnapshotService.java  |  62 ------
 .../TransactionBufferSnapshotServiceFactory.java   |  74 +++++++
 .../NamespaceEventsSystemTopicFactory.java         |  30 +--
 .../pulsar/broker/systopic/SystemTopicClient.java  |  13 +-
 .../systopic/TopicPoliciesSystemTopicClient.java   |  24 +--
 ...sactionBufferSnapshotBaseSystemTopicClient.java | 214 +++++++++++++++++++++
 .../TransactionBufferSystemTopicClient.java        | 208 --------------------
 .../buffer/impl/TopicTransactionBuffer.java        |  16 +-
 .../TopicTransactionBufferRecoverCallBack.java     |   2 +-
 .../{matadata => metadata}/AbortTxnMetadata.java   |   2 +-
 .../TransactionBufferSnapshot.java                 |   2 +-
 .../{matadata => metadata}/package-info.java       |   2 +-
 .../v2/TransactionBufferSnapshotIndex.java}        |  24 ++-
 .../v2/TransactionBufferSnapshotIndexes.java}      |  17 +-
 .../v2/TransactionBufferSnapshotSegment.java}      |  16 +-
 .../v2/TxnIDData.java}                             |  54 ++++--
 .../{matadata => metadata/v2}/package-info.java    |   5 +-
 .../NamespaceEventsSystemTopicServiceTest.java     |   3 +-
 .../TopicTransactionBufferRecoverTest.java         | 204 +++++++++++++++++---
 .../pulsar/broker/transaction/TransactionTest.java |  20 +-
 .../org/apache/pulsar/common/events/EventType.java |  12 +-
 .../pulsar/common/naming/SystemTopicNames.java     |  12 +-
 29 files changed, 732 insertions(+), 466 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
index 395da52b2af..78cca061c78 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Optional;
 import org.apache.bookkeeper.common.annotation.InterfaceAudience;
 import org.apache.bookkeeper.common.annotation.InterfaceStability;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
 
 /**
  * Definition of all the callbacks used for the ManagedLedger asynchronous API.
@@ -46,6 +47,12 @@ public interface AsyncCallbacks {
         void openReadOnlyCursorFailed(ManagedLedgerException exception, Object ctx);
     }
 
+    interface OpenReadOnlyManagedLedgerCallback {
+        void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx);
+
+        void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx);
+    }
+
     interface DeleteLedgerCallback {
         void deleteLedgerComplete(Object ctx);
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index f786d646fb4..e640f65e716 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -117,6 +117,17 @@ public interface ManagedLedgerFactory {
     void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, ManagedLedgerConfig config,
             OpenReadOnlyCursorCallback callback, Object ctx);
 
+    /**
+     * Asynchronous open a Read-only managedLedger.
+     * @param managedLedgerName the unique name that identifies the managed ledger
+     * @param callback
+     * @param config the managed ledger configuration.
+     * @param ctx opaque context
+     */
+    void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                                AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
+                                ManagedLedgerConfig config, Object ctx);
+
     /**
      * Get the current metadata info for a managed ledger.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index f0b10ac8dbf..632b56f2a86 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -423,7 +423,29 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
         });
     }
 
+    @Override
+    public void asyncOpenReadOnlyManagedLedger(String managedLedgerName,
+                              AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback,
+                              ManagedLedgerConfig config, Object ctx) {
+        if (closed) {
+            callback.openReadOnlyManagedLedgerFailed(
+                    new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
+        }
+        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
+                bookkeeperFactory
+                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
+                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
+                store, config, scheduledExecutor, managedLedgerName);
+        roManagedLedger.initialize().thenRun(() -> {
+            log.info("[{}] Successfully initialize Read-only managed ledger", managedLedgerName);
+            callback.openReadOnlyManagedLedgerComplete(roManagedLedger, ctx);
 
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to initialize Read-only managed ledger", managedLedgerName, e);
+            callback.openReadOnlyManagedLedgerFailed((ManagedLedgerException) e.getCause(), ctx);
+            return null;
+        });
+    }
 
     @Override
     public ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPosition,
@@ -465,28 +487,20 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
             return;
         }
         checkArgument(startPosition instanceof PositionImpl);
-        ReadOnlyManagedLedgerImpl roManagedLedger = new ReadOnlyManagedLedgerImpl(this,
-                bookkeeperFactory
-                        .get(new EnsemblePlacementPolicyConfig(config.getBookKeeperEnsemblePlacementPolicyClassName(),
-                                config.getBookKeeperEnsemblePlacementPolicyProperties())),
-                store, config, scheduledExecutor, managedLedgerName);
-
-        roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition)
-                .thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx))
-                .exceptionally(ex -> {
-            Throwable t = ex;
-            if (t instanceof CompletionException) {
-                t = ex.getCause();
+        AsyncCallbacks.OpenReadOnlyManagedLedgerCallback openReadOnlyManagedLedgerCallback =
+                new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+            @Override
+            public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger, Object ctx) {
+                callback.openReadOnlyCursorComplete(readOnlyManagedLedger.
+                        createReadOnlyCursor((PositionImpl) startPosition), ctx);
             }
 
-            if (t instanceof ManagedLedgerException) {
-                callback.openReadOnlyCursorFailed((ManagedLedgerException) t, ctx);
-            } else {
-                callback.openReadOnlyCursorFailed(new ManagedLedgerException(t), ctx);
+            @Override
+            public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                callback.openReadOnlyCursorFailed(exception, ctx);
             }
-
-            return null;
-        });
+        };
+        asyncOpenReadOnlyManagedLedger(managedLedgerName, openReadOnlyManagedLedgerCallback, config, null);
     }
 
     void close(ManagedLedger ledger) {
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 214c3afe1bc..8878b2aece0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -45,8 +45,8 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
         super(factory, bookKeeper, store, config, scheduledExecutor, name);
     }
 
-    CompletableFuture<ReadOnlyCursor> initializeAndCreateCursor(PositionImpl startPosition) {
-        CompletableFuture<ReadOnlyCursor> future = new CompletableFuture<>();
+    CompletableFuture<Void> initialize() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
 
         // Fetch the list of existing ledgers in the managed ledger
         store.getManagedLedgerInfo(name, false, new MetaStoreCallback<ManagedLedgerInfo>() {
@@ -72,7 +72,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
                                             .setTimestamp(clock.millis()).build();
                                     ledgers.put(lastLedgerId, info);
 
-                                    future.complete(createReadOnlyCursor(startPosition));
+                                    future.complete(null);
                                 }).exceptionally(ex -> {
                                     if (ex instanceof CompletionException
                                             && ex.getCause() instanceof IllegalArgumentException) {
@@ -80,7 +80,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
                                         LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId)
                                                 .setEntries(0).setSize(0).setTimestamp(clock.millis()).build();
                                         ledgers.put(lastLedgerId, info);
-                                        future.complete(createReadOnlyCursor(startPosition));
+                                        future.complete(null);
                                     } else {
                                         future.completeExceptionally(new ManagedLedgerException(ex));
                                     }
@@ -93,7 +93,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
                                     LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lastLedgerId).setEntries(0)
                                             .setSize(0).setTimestamp(clock.millis()).build();
                                     ledgers.put(lastLedgerId, info);
-                                    future.complete(createReadOnlyCursor(startPosition));
+                                    future.complete(null);
                                 } else {
                                     future.completeExceptionally(new ManagedLedgerException(ex));
                                 }
@@ -101,7 +101,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
                             });
                 } else {
                     // The read-only managed ledger is ready to use
-                    future.complete(createReadOnlyCursor(startPosition));
+                    future.complete(null);
                 }
             }
 
@@ -118,7 +118,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
         return future;
     }
 
-    private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
+    ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
         if (ledgers.isEmpty()) {
             lastConfirmedEntry = PositionImpl.EARLIEST;
         } else if (ledgers.lastEntry().getValue().getEntries() > 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f6aec263f13..02a4bdcd31d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -98,11 +98,10 @@ import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.rest.Topics;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
-import org.apache.pulsar.broker.service.SystemTopicBaseTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.TopicPoliciesService;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
@@ -260,8 +259,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
     private MetadataStoreExtended localMetadataStore;
     private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
     private CoordinationService coordinationService;
-    private TransactionBufferSnapshotService transactionBufferSnapshotService;
-
+    private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
     private MetadataStore configurationMetadataStore;
     private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
     private boolean shouldShutdownConfigurationMetadataStore;
@@ -510,9 +508,9 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 adminClient = null;
             }
 
-            if (transactionBufferSnapshotService != null) {
-                transactionBufferSnapshotService.close();
-                transactionBufferSnapshotService = null;
+            if (transactionBufferSnapshotServiceFactory != null) {
+                transactionBufferSnapshotServiceFactory.close();
+                transactionBufferSnapshotServiceFactory = null;
             }
 
             if (transactionBufferClient != null) {
@@ -837,7 +835,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                 MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
                 MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
 
-                this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
+                this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
+
                 this.transactionTimer =
                         new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
                 transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 4bd3a0952f0..90c58f5910f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -123,7 +123,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
             } else {
                 PulsarEvent event = getPulsarEvent(topicName, actionType, policies);
                 CompletableFuture<MessageId> actionFuture =
-                        ActionType.DELETE.equals(actionType) ? writer.deleteAsync(event) : writer.writeAsync(event);
+                        ActionType.DELETE.equals(actionType) ? writer.deleteAsync(getEventKey(event), event)
+                                : writer.writeAsync(getEventKey(event), event);
                 actionFuture.whenComplete(((messageId, e) -> {
                             if (e != null) {
                                 result.completeExceptionally(e);
@@ -455,7 +456,8 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
                     SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory
                             .createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject());
                     systemTopicClient.newWriterAsync().thenAccept(writer
-                            -> writer.deleteAsync(getPulsarEvent(topicName, ActionType.DELETE, null))
+                            -> writer.deleteAsync(getEventKey(topicName),
+                                    getPulsarEvent(topicName, ActionType.DELETE, null))
                             .whenComplete((result, e) -> writer.closeAsync().whenComplete((res, ex) -> {
                                 if (ex != null) {
                                     log.error("close writer failed ", ex);
@@ -539,6 +541,20 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic
         });
     }
 
+    public static String getEventKey(PulsarEvent event) {
+        return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
+                event.getTopicPoliciesEvent().getTenant(),
+                event.getTopicPoliciesEvent().getNamespace(),
+                event.getTopicPoliciesEvent().getTopic()).toString();
+    }
+
+    public static String getEventKey(TopicName topicName) {
+        return TopicName.get(topicName.getDomain().toString(),
+                topicName.getTenant(),
+                topicName.getNamespace(),
+                TopicName.get(topicName.getPartitionedTopicName()).getLocalName()).toString();
+    }
+
     @VisibleForTesting
     long getPoliciesCacheSize() {
         return policiesCache.size();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
similarity index 61%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index 719d492a524..9dc1fa1b8a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBaseTxnBufferSnapshotService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -23,64 +23,63 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException.InvalidTopicNameException;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 
-public class SystemTopicBaseTxnBufferSnapshotService implements TransactionBufferSnapshotService {
+public class SystemTopicTxnBufferSnapshotService<T> {
 
-    private final Map<TopicName, SystemTopicClient<TransactionBufferSnapshot>> clients;
+    protected final Map<TopicName, SystemTopicClient<T>> clients;
+    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
 
-    private final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
+    protected final Class<T> schemaType;
+    protected final EventType systemTopicType;
 
-    public SystemTopicBaseTxnBufferSnapshotService(PulsarClient client) {
+    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
+                                               Class<T> schemaType) {
         this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
+        this.systemTopicType = systemTopicType;
+        this.schemaType = schemaType;
         this.clients = new ConcurrentHashMap<>();
     }
 
-    @Override
-    public CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Writer<T>> createWriter(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newWriterAsync);
     }
 
-    private CompletableFuture<SystemTopicClient<TransactionBufferSnapshot>> getTransactionBufferSystemTopicClient(
-            TopicName topicName) {
-        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
-                .getSystemTopicName(topicName.getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT);
-        if (systemTopicName == null) {
-            return FutureUtil.failedFuture(
-                    new InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotService, "
-                            + "because the topicName is null!"));
-        }
-        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
-                (v) -> namespaceEventsSystemTopicFactory
-                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(), this)));
-    }
-
-    @Override
-    public CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName) {
+    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
         return getTransactionBufferSystemTopicClient(topicName).thenCompose(SystemTopicClient::newReaderAsync);
     }
 
-    @Override
-    public void removeClient(TopicName topicName,
-                                          TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
+    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
         if (transactionBufferSystemTopicClient.getReaders().size() == 0
                 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
             clients.remove(topicName);
         }
     }
 
-    @Override
+    protected CompletableFuture<SystemTopicClient<T>> getTransactionBufferSystemTopicClient(TopicName topicName) {
+        TopicName systemTopicName = NamespaceEventsSystemTopicFactory
+                .getSystemTopicName(topicName.getNamespaceObject(), systemTopicType);
+        if (systemTopicName == null) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException
+                            .InvalidTopicNameException("Can't create SystemTopicBaseTxnBufferSnapshotIndexService, "
+                            + "because the topicName is null!"));
+        }
+        return CompletableFuture.completedFuture(clients.computeIfAbsent(systemTopicName,
+                (v) -> namespaceEventsSystemTopicFactory
+                        .createTransactionBufferSystemTopicClient(topicName.getNamespaceObject(),
+                                this, schemaType)));
+    }
+
     public void close() throws Exception {
-        for (Map.Entry<TopicName, SystemTopicClient<TransactionBufferSnapshot>> entry : clients.entrySet()) {
+        for (Map.Entry<TopicName, SystemTopicClient<T>> entry : clients.entrySet()) {
             entry.getValue().close();
         }
     }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java
deleted file mode 100644
index b090e8fe46a..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotService.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
-import org.apache.pulsar.broker.systopic.SystemTopicClient.Writer;
-import org.apache.pulsar.broker.systopic.TransactionBufferSystemTopicClient;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
-import org.apache.pulsar.common.naming.TopicName;
-
-public interface TransactionBufferSnapshotService {
-
-    /**
-     * Create a transaction buffer snapshot writer.
-     *
-     * @param topicName {@link TopicName} the topic name
-     *
-     * @return {@link CompletableFuture<Writer>} return the future of writer
-     */
-    CompletableFuture<Writer<TransactionBufferSnapshot>> createWriter(TopicName topicName);
-
-    /**
-     * Create a transaction buffer snapshot reader.
-     *
-     * @param topicName {@link TopicName} the topic name
-     *
-     * @return {@link CompletableFuture<Writer>} return the future of reader
-     */
-    CompletableFuture<Reader<TransactionBufferSnapshot>> createReader(TopicName topicName);
-
-    /**
-     * Remove a topic client from cache.
-     *
-     * @param topicName {@link TopicName} the topic name
-     * @param transactionBufferSystemTopicClient {@link TransactionBufferSystemTopicClient} the topic client
-     *
-     */
-    void removeClient(TopicName topicName, TransactionBufferSystemTopicClient transactionBufferSystemTopicClient);
-
-    /**
-     * Close transaction buffer snapshot service.
-     */
-    void close() throws Exception;
-
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
new file mode 100644
index 00000000000..2220c203237
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.events.EventType;
+
+public class TransactionBufferSnapshotServiceFactory {
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> txnBufferSnapshotService;
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
+            txnBufferSnapshotSegmentService;
+
+    private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> txnBufferSnapshotIndexService;
+
+    public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
+        this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+                EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
+                TransactionBufferSnapshotSegment.class);
+        this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+                EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
+        this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+                EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
+    }
+
+    public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> getTxnBufferSnapshotIndexService() {
+        return this.txnBufferSnapshotIndexService;
+    }
+
+    public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
+    getTxnBufferSnapshotSegmentService() {
+        return this.txnBufferSnapshotSegmentService;
+    }
+
+    public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> getTxnBufferSnapshotService() {
+        return this.txnBufferSnapshotService;
+    }
+
+    public void close() throws Exception {
+        if (this.txnBufferSnapshotIndexService != null) {
+            this.txnBufferSnapshotIndexService.close();
+            this.txnBufferSnapshotIndexService = null;
+        }
+        if (this.txnBufferSnapshotSegmentService != null) {
+            this.txnBufferSnapshotSegmentService.close();
+            this.txnBufferSnapshotSegmentService = null;
+        }
+        if (this.txnBufferSnapshotService != null) {
+            this.txnBufferSnapshotService.close();
+            this.txnBufferSnapshotService = null;
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
index 9e162f741b2..c86ad70f6fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.systopic;
 
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -43,25 +43,27 @@ public class NamespaceEventsSystemTopicFactory {
         return new TopicPoliciesSystemTopicClient(client, topicName);
     }
 
-    public TransactionBufferSystemTopicClient createTransactionBufferSystemTopicClient(NamespaceName namespaceName,
-                                                   TransactionBufferSnapshotService transactionBufferSnapshotService) {
+    public <T> TransactionBufferSnapshotBaseSystemTopicClient<T> createTransactionBufferSystemTopicClient(
+            NamespaceName namespaceName, SystemTopicTxnBufferSnapshotService<T>
+            systemTopicTxnBufferSnapshotService, Class<T> schemaType) {
         TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName,
                 SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
         log.info("Create transaction buffer snapshot client, topicName : {}", topicName.toString());
-        return new TransactionBufferSystemTopicClient(client, topicName, transactionBufferSnapshotService);
+        return new TransactionBufferSnapshotBaseSystemTopicClient(client, topicName,
+                systemTopicTxnBufferSnapshotService, schemaType);
     }
 
     public static TopicName getSystemTopicName(NamespaceName namespaceName, EventType eventType) {
-        switch (eventType) {
-            case TOPIC_POLICY:
-                return TopicName.get(TopicDomain.persistent.value(), namespaceName,
-                        SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
-            case TRANSACTION_BUFFER_SNAPSHOT:
-                return TopicName.get(TopicDomain.persistent.value(), namespaceName,
-                        SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-            default:
-                return null;
-        }
+        return switch (eventType) {
+            case TOPIC_POLICY -> TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
+            case TRANSACTION_BUFFER_SNAPSHOT -> TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+            case TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS -> TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+            case TRANSACTION_BUFFER_SNAPSHOT_INDEXES -> TopicName.get(TopicDomain.persistent.value(), namespaceName,
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+        };
     }
 
     private static final Logger log = LoggerFactory.getLogger(NamespaceEventsSystemTopicFactory.class);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
index 2bc740a41d4..11dfeff2008 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java
@@ -87,37 +87,42 @@ public interface SystemTopicClient<T> {
      * Writer for system topic.
      */
     interface Writer<T> {
+
         /**
          * Write event to the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        MessageId write(T t) throws PulsarClientException;
+        MessageId write(String key, T t) throws PulsarClientException;
 
         /**
          * Async write event to the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id future
          */
-        CompletableFuture<MessageId> writeAsync(T t);
+        CompletableFuture<MessageId> writeAsync(String key, T t);
 
         /**
          * Delete event in the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id
          * @throws PulsarClientException exception while write event cause
          */
-        default MessageId delete(T t) throws PulsarClientException {
+        default MessageId delete(String key, T t) throws PulsarClientException {
             throw new UnsupportedOperationException("Unsupported operation");
         }
 
         /**
          * Async delete event in the system topic.
+         * @param key the key of the event
          * @param t pulsar event
          * @return message id future
          */
-        default CompletableFuture<MessageId> deleteAsync(T t) {
+        default CompletableFuture<MessageId> deleteAsync(String key, T t) {
             throw new UnsupportedOperationException("Unsupported operation");
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
index 4ad2137d80b..1efa47ff81c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TopicPoliciesSystemTopicClient.java
@@ -85,41 +85,35 @@ public class TopicPoliciesSystemTopicClient extends SystemTopicClientBase<Pulsar
         }
 
         @Override
-        public MessageId write(PulsarEvent event) throws PulsarClientException {
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(event);
+        public MessageId write(String key, PulsarEvent event) throws PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> writeAsync(PulsarEvent event) {
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(event);
+        public CompletableFuture<MessageId> writeAsync(String key, PulsarEvent event) {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(event);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
         @Override
-        public MessageId delete(PulsarEvent event) throws PulsarClientException {
-            validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(null);
+        public MessageId delete(String key, PulsarEvent event) throws PulsarClientException {
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.send();
         }
 
         @Override
-        public CompletableFuture<MessageId> deleteAsync(PulsarEvent event) {
+        public CompletableFuture<MessageId> deleteAsync(String key, PulsarEvent event) {
             validateActionType(event);
-            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(getEventKey(event)).value(null);
+            TypedMessageBuilder<PulsarEvent> builder = producer.newMessage().key(key).value(null);
             setReplicateCluster(event, builder);
             return builder.sendAsync();
         }
 
-        private String getEventKey(PulsarEvent event) {
-            return TopicName.get(event.getTopicPoliciesEvent().getDomain(),
-                event.getTopicPoliciesEvent().getTenant(),
-                event.getTopicPoliciesEvent().getNamespace(),
-                event.getTopicPoliciesEvent().getTopic()).toString();
-        }
+
 
         @Override
         public void close() throws IOException {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
new file mode 100644
index 00000000000..11908512d4f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSnapshotBaseSystemTopicClient.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.systopic;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.naming.TopicName;
+
+@Slf4j
+public class  TransactionBufferSnapshotBaseSystemTopicClient<T> extends SystemTopicClientBase<T> {
+
+    protected final SystemTopicTxnBufferSnapshotService<T> systemTopicTxnBufferSnapshotService;
+    protected final Class<T> schemaType;
+
+    public TransactionBufferSnapshotBaseSystemTopicClient(PulsarClient client,
+                                                          TopicName topicName,
+                                                          SystemTopicTxnBufferSnapshotService<T>
+                                                                  systemTopicTxnBufferSnapshotService,
+                                                          Class<T> schemaType) {
+        super(client, topicName);
+        this.systemTopicTxnBufferSnapshotService = systemTopicTxnBufferSnapshotService;
+        this.schemaType = schemaType;
+    }
+
+    protected void removeWriter(Writer<T> writer) {
+        writers.remove(writer);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected void removeReader(Reader<T> reader) {
+        readers.remove(reader);
+        this.systemTopicTxnBufferSnapshotService.removeClient(topicName, this);
+    }
+
+    protected static class TransactionBufferSnapshotWriter<T> implements Writer<T> {
+
+        protected final Producer<T> producer;
+        protected final TransactionBufferSnapshotBaseSystemTopicClient<T>
+                transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotWriter(Producer<T> producer,
+                                                  TransactionBufferSnapshotBaseSystemTopicClient<T>
+                                                    transactionBufferSnapshotBaseSystemTopicClient) {
+            this.producer = producer;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public MessageId write(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage().key(key)
+                    .value(t).send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> writeAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(t).sendAsync();
+        }
+
+        @Override
+        public MessageId delete(String key, T t)
+                throws PulsarClientException {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .send();
+        }
+
+        @Override
+        public CompletableFuture<MessageId> deleteAsync(String key, T t) {
+            return producer.newMessage()
+                    .key(key)
+                    .value(null)
+                    .sendAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.closeAsync().join();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            producer.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the producer
+                transactionBufferSnapshotBaseSystemTopicClient.removeWriter(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopicClient() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    protected static class TransactionBufferSnapshotReader<T> implements Reader<T> {
+
+        private final org.apache.pulsar.client.api.Reader<T> reader;
+        private final TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient;
+
+        protected TransactionBufferSnapshotReader(
+                org.apache.pulsar.client.api.Reader<T> reader,
+                TransactionBufferSnapshotBaseSystemTopicClient<T> transactionBufferSnapshotBaseSystemTopicClient) {
+            this.reader = reader;
+            this.transactionBufferSnapshotBaseSystemTopicClient = transactionBufferSnapshotBaseSystemTopicClient;
+        }
+
+        @Override
+        public Message<T> readNext() throws PulsarClientException {
+            return reader.readNext();
+        }
+
+        @Override
+        public CompletableFuture<Message<T>> readNextAsync() {
+            return reader.readNextAsync();
+        }
+
+        @Override
+        public boolean hasMoreEvents() throws PulsarClientException {
+            return reader.hasMessageAvailable();
+        }
+
+        @Override
+        public CompletableFuture<Boolean> hasMoreEventsAsync() {
+            return reader.hasMessageAvailableAsync();
+        }
+
+        @Override
+        public void close() throws IOException {
+            this.closeAsync().join();
+        }
+
+        @Override
+        public CompletableFuture<Void> closeAsync() {
+            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+            reader.closeAsync().whenComplete((v, e) -> {
+                // if close fail, also need remove the reader
+                transactionBufferSnapshotBaseSystemTopicClient.removeReader(this);
+                if (e != null) {
+                    completableFuture.completeExceptionally(e);
+                    return;
+                }
+                completableFuture.complete(null);
+            });
+            return completableFuture;
+        }
+
+        @Override
+        public SystemTopicClient<T> getSystemTopic() {
+            return transactionBufferSnapshotBaseSystemTopicClient;
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Writer<T>> newWriterAsyncInternal() {
+        return client.newProducer(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .createAsync().thenApply(producer -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new {} writer is created", topicName, schemaType.getName());
+                    }
+                    return  new TransactionBufferSnapshotWriter<>(producer, this);
+                });
+    }
+
+    @Override
+    protected CompletableFuture<Reader<T>> newReaderAsyncInternal() {
+        return client.newReader(Schema.AVRO(schemaType))
+                .topic(topicName.toString())
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .createAsync()
+                .thenApply(reader -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] A new {} reader is created", topicName, schemaType.getName());
+                    }
+                    return new TransactionBufferSnapshotReader<>(reader, this);
+                });
+    }
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
deleted file mode 100644
index aaab858ab1e..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.broker.systopic;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.common.naming.TopicName;
-
-@Slf4j
-public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<TransactionBufferSnapshot> {
-    private TransactionBufferSnapshotService transactionBufferSnapshotService;
-
-    public TransactionBufferSystemTopicClient(PulsarClient client, TopicName topicName,
-                                              TransactionBufferSnapshotService transactionBufferSnapshotService) {
-        super(client, topicName);
-        this.transactionBufferSnapshotService = transactionBufferSnapshotService;
-    }
-
-    @Override
-    protected CompletableFuture<Writer<TransactionBufferSnapshot>> newWriterAsyncInternal() {
-        return client.newProducer(Schema.AVRO(TransactionBufferSnapshot.class))
-                .topic(topicName.toString())
-                .createAsync().thenCompose(producer -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] A new transactionBufferSnapshot writer is created", topicName);
-                    }
-                    return CompletableFuture.completedFuture(
-                            new TransactionBufferSnapshotWriter(producer, this));
-                });
-    }
-
-    @Override
-    protected CompletableFuture<Reader<TransactionBufferSnapshot>> newReaderAsyncInternal() {
-        return client.newReader(Schema.AVRO(TransactionBufferSnapshot.class))
-                .topic(topicName.toString())
-                .startMessageId(MessageId.earliest)
-                .readCompacted(true)
-                .createAsync()
-                .thenCompose(reader -> {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] A new transactionBufferSnapshot buffer reader is created", topicName);
-                    }
-                    return CompletableFuture.completedFuture(
-                            new TransactionBufferSnapshotReader(reader, this));
-                });
-    }
-
-    protected void removeWriter(TransactionBufferSnapshotWriter writer) {
-        writers.remove(writer);
-        this.transactionBufferSnapshotService.removeClient(topicName, this);
-    }
-
-    protected void removeReader(TransactionBufferSnapshotReader reader) {
-        readers.remove(reader);
-        this.transactionBufferSnapshotService.removeClient(topicName, this);
-    }
-
-    private static class TransactionBufferSnapshotWriter implements Writer<TransactionBufferSnapshot> {
-
-        private final Producer<TransactionBufferSnapshot> producer;
-        private final TransactionBufferSystemTopicClient transactionBufferSystemTopicClient;
-
-        private TransactionBufferSnapshotWriter(Producer<TransactionBufferSnapshot> producer,
-                                                TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
-            this.producer = producer;
-            this.transactionBufferSystemTopicClient = transactionBufferSystemTopicClient;
-        }
-
-        @Override
-        public MessageId write(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException {
-            return producer.newMessage().key(transactionBufferSnapshot.getTopicName())
-                    .value(transactionBufferSnapshot).send();
-        }
-
-        @Override
-        public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
-            return producer.newMessage().key(transactionBufferSnapshot.getTopicName())
-                    .value(transactionBufferSnapshot).sendAsync();
-        }
-
-        @Override
-        public MessageId delete(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException {
-            return producer.newMessage()
-                    .key(transactionBufferSnapshot.getTopicName())
-                    .value(null)
-                    .send();
-        }
-
-        @Override
-        public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) {
-            return producer.newMessage()
-                    .key(transactionBufferSnapshot.getTopicName())
-                    .value(null)
-                    .sendAsync();
-        }
-
-        @Override
-        public void close() throws IOException {
-            this.producer.close();
-            transactionBufferSystemTopicClient.removeWriter(this);
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-            producer.closeAsync().whenComplete((v, e) -> {
-                // if close fail, also need remove the producer
-                transactionBufferSystemTopicClient.removeWriter(this);
-                if (e != null) {
-                    completableFuture.completeExceptionally(e);
-                    return;
-                }
-                completableFuture.complete(null);
-            });
-            return completableFuture;
-        }
-
-        @Override
-        public SystemTopicClient<TransactionBufferSnapshot> getSystemTopicClient() {
-            return transactionBufferSystemTopicClient;
-        }
-    }
-
-    private static class TransactionBufferSnapshotReader implements Reader<TransactionBufferSnapshot> {
-
-        private final org.apache.pulsar.client.api.Reader<TransactionBufferSnapshot> reader;
-        private final TransactionBufferSystemTopicClient transactionBufferSystemTopicClient;
-
-        private TransactionBufferSnapshotReader(org.apache.pulsar.client.api.Reader<TransactionBufferSnapshot> reader,
-                                                TransactionBufferSystemTopicClient transactionBufferSystemTopicClient) {
-            this.reader = reader;
-            this.transactionBufferSystemTopicClient = transactionBufferSystemTopicClient;
-        }
-
-        @Override
-        public Message<TransactionBufferSnapshot> readNext() throws PulsarClientException {
-            return reader.readNext();
-        }
-
-        @Override
-        public CompletableFuture<Message<TransactionBufferSnapshot>> readNextAsync() {
-            return reader.readNextAsync();
-        }
-
-        @Override
-        public boolean hasMoreEvents() throws PulsarClientException {
-            return reader.hasMessageAvailable();
-        }
-
-        @Override
-        public CompletableFuture<Boolean> hasMoreEventsAsync() {
-            return reader.hasMessageAvailableAsync();
-        }
-
-        @Override
-        public void close() throws IOException {
-            this.reader.close();
-            transactionBufferSystemTopicClient.removeReader(this);
-        }
-
-        @Override
-        public CompletableFuture<Void> closeAsync() {
-            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
-            reader.closeAsync().whenComplete((v, e) -> {
-                // if close fail, also need remove the reader
-                transactionBufferSystemTopicClient.removeReader(this);
-                if (e != null) {
-                    completableFuture.completeExceptionally(e);
-                    return;
-                }
-                completableFuture.complete(null);
-            });
-            return completableFuture;
-        }
-
-        @Override
-        public SystemTopicClient<TransactionBufferSnapshot> getSystemTopic() {
-            return transactionBufferSystemTopicClient;
-        }
-    }
-}
-
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index ad778137001..1245c7d8129 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -48,8 +48,8 @@ import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
-import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
+import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -117,7 +117,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         super(State.None);
         this.topic = topic;
         this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar()
-                .getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
+                .getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService().createWriter(TopicName.get(topic.getName()));
         this.timer = topic.getBrokerService().getPulsar().getTransactionTimer();
         this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar()
                 .getConfiguration().getTransactionBufferSnapshotMaxTransactionCount();
@@ -484,7 +485,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                 });
                 snapshot.setAborts(list);
             }
-            return writer.writeAsync(snapshot).thenAccept(messageId-> {
+            return writer.writeAsync(snapshot.getTopicName(), snapshot).thenAccept(messageId-> {
                 this.lastSnapshotTimestamps = System.currentTimeMillis();
                 if (log.isDebugEnabled()) {
                     log.debug("[{}]Transaction buffer take snapshot success! "
@@ -532,7 +533,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         return this.takeSnapshotWriter.thenCompose(writer -> {
             TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot();
             snapshot.setTopicName(topic.getName());
-            return writer.deleteAsync(snapshot);
+            return writer.deleteAsync(snapshot.getTopicName(), snapshot);
         }).thenCompose(__ -> CompletableFuture.completedFuture(null));
     }
 
@@ -645,8 +646,9 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                             this, topic.getName());
                     return;
                 }
-                topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
-                        .createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
+                topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                        .getTxnBufferSnapshotService().createReader(TopicName.get(topic.getName()))
+                        .thenAcceptAsync(reader -> {
                             try {
                                 boolean hasSnapshot = false;
                                 while (reader.hasMoreEvents()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
index 87b8e930a27..d229fbb8f5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBufferRecoverCallBack.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.transaction.buffer.impl;
 
 import org.apache.bookkeeper.mledger.Entry;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 
 public interface TopicTransactionBufferRecoverCallBack {
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/AbortTxnMetadata.java
similarity index 94%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/AbortTxnMetadata.java
index 5e532d6bff7..7e997111a04 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/AbortTxnMetadata.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/TransactionBufferSnapshot.java
similarity index 95%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/TransactionBufferSnapshot.java
index 59b851e7397..dbe15e81854 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/TransactionBufferSnapshot.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata;
 
 import java.util.List;
 import lombok.AllArgsConstructor;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/package-info.java
similarity index 93%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/package-info.java
index f32853a7d86..74688c857c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/package-info.java
@@ -19,4 +19,4 @@
 /**
  * The transaction buffer snapshot metadata.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndex.java
similarity index 71%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndex.java
index 5e532d6bff7..bc4101e1c92 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/AbortTxnMetadata.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndex.java
@@ -16,23 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Builder;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
-/**
- * Abort txn metadata.
- */
+@Builder
+@Data
 @AllArgsConstructor
 @NoArgsConstructor
-@Getter
-@Setter
-public class AbortTxnMetadata {
-    long txnIdMostBits;
-    long txnIdLeastBits;
-    long ledgerId;
-    long entryId;
+public class TransactionBufferSnapshotIndex {
+    public long sequenceID;
+    public long maxReadPositionLedgerID;
+    public long maxReadPositionEntryID;
+    public long persistentPositionLedgerID;
+    public long persistentPositionEntryID;
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
similarity index 78%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
index 59b851e7397..28b2b05a496 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotIndexes.java
@@ -16,24 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
 import java.util.List;
 import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 
-/**
- * Transaction buffer snapshot metadata.
- */
 @AllArgsConstructor
 @NoArgsConstructor
 @Getter
 @Setter
-public class TransactionBufferSnapshot {
+@Builder
+public class TransactionBufferSnapshotIndexes {
     private String topicName;
-    private long maxReadPositionLedgerId;
-    private long maxReadPositionEntryId;
-    private List<AbortTxnMetadata> aborts;
+
+    private List<TransactionBufferSnapshotIndex> indexList;
+
+    private TransactionBufferSnapshotSegment snapshot;
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotSegment.java
similarity index 80%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotSegment.java
index 59b851e7397..478ec53ba29 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TransactionBufferSnapshotSegment.java
@@ -16,24 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
 import java.util.List;
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
-/**
- * Transaction buffer snapshot metadata.
- */
+@Data
 @AllArgsConstructor
 @NoArgsConstructor
-@Getter
-@Setter
-public class TransactionBufferSnapshot {
+public class TransactionBufferSnapshotSegment {
     private String topicName;
+    private long sequenceId;
     private long maxReadPositionLedgerId;
     private long maxReadPositionEntryId;
-    private List<AbortTxnMetadata> aborts;
+    private List<TxnIDData> aborts;
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java
similarity index 51%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java
index 59b851e7397..8e565c017ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/TransactionBufferSnapshot.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java
@@ -16,24 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
 
-import java.util.List;
+import java.util.Objects;
 import lombok.AllArgsConstructor;
-import lombok.Getter;
+import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Setter;
 
-/**
- * Transaction buffer snapshot metadata.
- */
-@AllArgsConstructor
+@Data
 @NoArgsConstructor
-@Getter
-@Setter
-public class TransactionBufferSnapshot {
-    private String topicName;
-    private long maxReadPositionLedgerId;
-    private long maxReadPositionEntryId;
-    private List<AbortTxnMetadata> aborts;
+@AllArgsConstructor
+public class TxnIDData {
+    /*
+     * The most significant 64 bits of this TxnID.
+     *
+     * @serial
+     */
+    private long mostSigBits;
+
+    /*
+     * The least significant 64 bits of this TxnID.
+     *
+     * @serial
+     */
+    private long leastSigBits;
+
+    @Override
+    public String toString() {
+        return "(" + mostSigBits + "," + leastSigBits + ")";
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(mostSigBits, leastSigBits);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof TxnIDData other) {
+            return Objects.equals(mostSigBits, other.mostSigBits)
+                    && Objects.equals(leastSigBits, other.leastSigBits);
+        }
+
+        return false;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/package-info.java
similarity index 87%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/package-info.java
index f32853a7d86..02d6e7a716d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/matadata/package-info.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/package-info.java
@@ -16,7 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/**
- * The transaction buffer snapshot metadata.
- */
-package org.apache.pulsar.broker.transaction.buffer.matadata;
+package org.apache.pulsar.broker.transaction.buffer.metadata.v2;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
index 91db232d2aa..a0d7801f2fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicServiceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import static org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
 import static org.mockito.Mockito.mock;
 import com.google.common.collect.Sets;
 import java.util.HashSet;
@@ -121,7 +122,7 @@ public class NamespaceEventsSystemTopicServiceTest extends MockedPulsarServiceBa
                 .policies(policies)
                 .build())
             .build();
-        systemTopicClientForNamespace1.newWriter().write(event);
+        systemTopicClientForNamespace1.newWriter().write(getEventKey(event), event);
         SystemTopicClient.Reader reader = systemTopicClientForNamespace1.newReader();
         Message<PulsarEvent> received = reader.readNext();
         log.info("Receive pulsar event from system topic : {}", received.getValue());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index c0afdbee487..c1f6ff16e77 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -28,9 +28,14 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -38,21 +43,30 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.impl.ReadOnlyManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.commons.collections4.map.LinkedMap;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -66,9 +80,11 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.events.EventType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.awaitility.Awaitility;
@@ -82,6 +98,8 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
     private static final String RECOVER_COMMIT = NAMESPACE1 + "/recover-commit";
     private static final String RECOVER_ABORT = NAMESPACE1 + "/recover-abort";
+    private static final String SNAPSHOT_INDEX = NAMESPACE1 + "/snapshot-index";
+    private static final String SNAPSHOT_SEGMENT = NAMESPACE1 + "/snapshot-segment";
     private static final String SUBSCRIPTION_NAME = "test-recover";
     private static final String TAKE_SNAPSHOT = NAMESPACE1 + "/take-snapshot";
     private static final String ABORT_DELETE = NAMESPACE1 + "/abort-delete";
@@ -472,60 +490,66 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
 
         PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
-        TransactionBufferSnapshotService transactionBufferSnapshotService =
-                mock(TransactionBufferSnapshotService.class);
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> systemTopicTxnBufferSnapshotService =
+                mock(SystemTopicTxnBufferSnapshotService.class);
         SystemTopicClient.Reader<TransactionBufferSnapshot> reader = mock(SystemTopicClient.Reader.class);
         SystemTopicClient.Writer<TransactionBufferSnapshot> writer = mock(SystemTopicClient.Writer.class);
 
-        doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
-        doReturn(CompletableFuture.completedFuture(writer)).when(transactionBufferSnapshotService).createWriter(any());
+        doReturn(CompletableFuture.completedFuture(reader))
+                .when(systemTopicTxnBufferSnapshotService).createReader(any());
+        doReturn(CompletableFuture.completedFuture(writer))
+                .when(systemTopicTxnBufferSnapshotService).createWriter(any());
+        TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory =
+                mock(TransactionBufferSnapshotServiceFactory.class);
+        doReturn(systemTopicTxnBufferSnapshotService)
+                .when(transactionBufferSnapshotServiceFactory).getTxnBufferSnapshotService();
         doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
         doReturn(CompletableFuture.completedFuture(null)).when(writer).closeAsync();
-        Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService");
+        Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotServiceFactory");
         field.setAccessible(true);
-        TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal =
-                (TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0));
+        TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactoryOriginal =
+                ((TransactionBufferSnapshotServiceFactory)field.get(getPulsarServiceList().get(0)));
         // mock reader can't read snapshot fail throw RuntimeException
         doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
         // check reader close topic
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, producer);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock reader can't read snapshot fail throw PulsarClientException
         doThrow(new PulsarClientException("test")).when(reader).hasMoreEvents();
         // check reader close topic
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, producer);
         doReturn(true).when(reader).hasMoreEvents();
 
         // mock create reader fail
         doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-                .when(transactionBufferSnapshotService).createReader(any());
+                .when(systemTopicTxnBufferSnapshotService).createReader(any());
         // check create reader fail close topic
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
-        doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any());
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, producer);
+        doReturn(CompletableFuture.completedFuture(reader)).when(systemTopicTxnBufferSnapshotService).createReader(any());
 
         // check create writer fail close topic
         originalTopic = (PersistentTopic) getPulsarServiceList().get(0)
                 .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get();
         // mock create writer fail
         doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
-                .when(transactionBufferSnapshotService).createWriter(any());
-        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
-                transactionBufferSnapshotService, originalTopic, field, producer);
+                .when(systemTopicTxnBufferSnapshotService).createWriter(any());
+        checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceFactoryOriginal,
+                transactionBufferSnapshotServiceFactory, originalTopic, field, producer);
     }
 
     private void checkCloseTopic(PulsarClient pulsarClient,
-                                 TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal,
-                                 TransactionBufferSnapshotService transactionBufferSnapshotService,
+                                 TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactoryOriginal,
+                                 TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory,
                                  PersistentTopic originalTopic,
                                  Field field,
                                  Producer<byte[]> producer) throws Exception {
-        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService);
+        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactory);
 
         // recover again will throw then close topic
         new TopicTransactionBuffer(originalTopic);
@@ -536,7 +560,7 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
             assertTrue((boolean) close.get(originalTopic));
         });
 
-        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal);
+        field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactoryOriginal);
 
         Transaction txn = pulsarClient.newTransaction()
                 .withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -565,4 +589,136 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase {
         assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
     }
 
+    @Test
+    public void testTransactionBufferIndexSystemTopic() throws Exception {
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> transactionBufferSnapshotIndexService =
+                new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+
+        SystemTopicClient.Writer<TransactionBufferSnapshotIndexes> indexesWriter =
+                transactionBufferSnapshotIndexService.createWriter(TopicName.get(SNAPSHOT_INDEX)).get();
+
+        SystemTopicClient.Reader<TransactionBufferSnapshotIndexes> indexesReader =
+                transactionBufferSnapshotIndexService.createReader(TopicName.get(SNAPSHOT_INDEX)).get();
+
+
+        List<TransactionBufferSnapshotIndex> indexList = new LinkedList<>();
+
+        for (long i = 0; i < 5; i++) {
+            indexList.add(new TransactionBufferSnapshotIndex(i, i, i, i, i));
+        }
+
+        TransactionBufferSnapshotIndexes transactionBufferTransactionBufferSnapshotIndexes =
+                new TransactionBufferSnapshotIndexes(SNAPSHOT_INDEX,
+                        indexList, null);
+
+        indexesWriter.write(SNAPSHOT_INDEX, transactionBufferTransactionBufferSnapshotIndexes);
+
+        assertTrue(indexesReader.hasMoreEvents());
+        transactionBufferTransactionBufferSnapshotIndexes = indexesReader.readNext().getValue();
+        assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getTopicName(), SNAPSHOT_INDEX);
+        assertEquals(transactionBufferTransactionBufferSnapshotIndexes.getIndexList().size(), 5);
+        assertNull(transactionBufferTransactionBufferSnapshotIndexes.getSnapshot());
+
+        TransactionBufferSnapshotIndex transactionBufferSnapshotIndex =
+                transactionBufferTransactionBufferSnapshotIndexes.getIndexList().get(1);
+        assertEquals(transactionBufferSnapshotIndex.getMaxReadPositionLedgerID(), 1L);
+        assertEquals(transactionBufferSnapshotIndex.getMaxReadPositionEntryID(), 1L);
+        assertEquals(transactionBufferSnapshotIndex.getPersistentPositionLedgerID(), 1L);
+        assertEquals(transactionBufferSnapshotIndex.getPersistentPositionEntryID(), 1L);
+        assertEquals(transactionBufferSnapshotIndex.getSequenceID(), 1L);
+    }
+
+    public static String buildKey(
+            TransactionBufferSnapshotSegment snapshot) {
+        return  "multiple-" + snapshot.getSequenceId() + "-" + snapshot.getTopicName();
+    }
+
+    @Test
+    public void testTransactionBufferSegmentSystemTopic() throws Exception {
+        // init topic and topicName
+        String snapshotTopic = NAMESPACE1 + "/" + EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
+        TopicName snapshotSegmentTopicName = TopicName.getPartitionedTopicName(snapshotTopic);
+
+        //send message to create manager ledger
+        Producer<TransactionBufferSnapshotSegment> producer =
+                pulsarClient.newProducer(Schema.AVRO(
+                                TransactionBufferSnapshotSegment.class))
+                .topic(snapshotTopic)
+                .create();
+
+        // get brokerService and pulsarService
+        PulsarService pulsarService = getPulsarServiceList().get(0);
+        BrokerService brokerService = pulsarService.getBrokerService();
+
+        // create snapshot segment writer
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
+                transactionBufferSnapshotSegmentService =
+                new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+
+        SystemTopicClient.Writer<TransactionBufferSnapshotSegment>
+                segmentWriter = transactionBufferSnapshotSegmentService.createWriter(snapshotSegmentTopicName).get();
+
+        // write two snapshot to snapshot segment topic
+        TransactionBufferSnapshotSegment snapshot =
+                new TransactionBufferSnapshotSegment();
+
+        //build and send snapshot
+        snapshot.setTopicName(snapshotTopic);
+        snapshot.setSequenceId(1L);
+        snapshot.setMaxReadPositionLedgerId(2L);
+        snapshot.setMaxReadPositionEntryId(3L);
+        snapshot.setAborts(Collections.singletonList(
+                new TxnIDData(1, 1)));
+
+        segmentWriter.write(buildKey(snapshot), snapshot);
+        snapshot.setSequenceId(2L);
+
+        MessageIdImpl messageId = (MessageIdImpl) segmentWriter.write(buildKey(snapshot), snapshot);
+
+        //Create read-only managed ledger
+        //And read the entry and decode entry to snapshot
+        CompletableFuture<Entry> entryCompletableFuture = new CompletableFuture<>();
+        AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback = new AsyncCallbacks
+                .OpenReadOnlyManagedLedgerCallback() {
+            @Override
+            public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger, Object ctx) {
+                readOnlyManagedLedger.asyncReadEntry(
+                        new PositionImpl(messageId.getLedgerId(), messageId.getEntryId()),
+                        new AsyncCallbacks.ReadEntryCallback() {
+                            @Override
+                            public void readEntryComplete(Entry entry, Object ctx) {
+                                entryCompletableFuture.complete(entry);
+                            }
+
+                            @Override
+                            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                                entryCompletableFuture.completeExceptionally(exception);
+                            }
+                        }, null);
+            }
+
+            @Override
+            public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
+                //
+            }
+        };
+        pulsarService.getManagedLedgerFactory()
+                .asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName.getPersistenceNamingEncoding(), callback,
+                        brokerService.getManagedLedgerConfig(snapshotSegmentTopicName).get(),null);
+
+        Entry entry = entryCompletableFuture.get();
+        //decode snapshot from entry
+        ByteBuf headersAndPayload = entry.getDataBuffer();
+        //skip metadata
+        MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
+        snapshot = Schema.AVRO(TransactionBufferSnapshotSegment.class).decode(Unpooled.wrappedBuffer(headersAndPayload).nioBuffer());
+
+        //verify snapshot
+        assertEquals(snapshot.getTopicName(), snapshotTopic);
+        assertEquals(snapshot.getSequenceId(), 2L);
+        assertEquals(snapshot.getMaxReadPositionLedgerId(), 2L);
+        assertEquals(snapshot.getMaxReadPositionEntryId(), 3L);
+        assertEquals(snapshot.getAborts().get(0), new TxnIDData(1, 1));
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 952cd9b2c45..67e7fe268a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -84,8 +84,9 @@ import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
 import org.apache.pulsar.broker.service.BacklogQuotaManager;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
 import org.apache.pulsar.broker.service.Topic;
-import org.apache.pulsar.broker.service.TransactionBufferSnapshotService;
+import org.apache.pulsar.broker.service.TransactionBufferSnapshotServiceFactory;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
@@ -96,7 +97,7 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferPr
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferRecoverCallBack;
 import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
-import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
+import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckReplyCallBack;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
@@ -1514,18 +1515,23 @@ public class TransactionTest extends TransactionTestBase {
         when(pendingAckStoreProvider.newPendingAckStore(any()))
                 .thenReturn(CompletableFuture.completedFuture(pendingAckStore));
         // Mock TransactionBufferSnapshotService
-        TransactionBufferSnapshotService transactionBufferSnapshotService
-                = mock(TransactionBufferSnapshotService.class);
-        SystemTopicClient.Writer writer = mock(SystemTopicClient.Writer.class);
+        SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> systemTopicTxnBufferSnapshotService
+                = mock(SystemTopicTxnBufferSnapshotService.class);
+        SystemTopicClient.Writer<TransactionBufferSnapshot> writer = mock(SystemTopicClient.Writer.class);
         when(writer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
-        when(transactionBufferSnapshotService.createWriter(any()))
+        when(systemTopicTxnBufferSnapshotService.createWriter(any()))
                 .thenReturn(CompletableFuture.completedFuture(writer));
+        TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory =
+                mock(TransactionBufferSnapshotServiceFactory.class);
+        when(transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService())
+                .thenReturn(systemTopicTxnBufferSnapshotService);
+
         // Mock pulsar.
         PulsarService pulsar = mock(PulsarService.class);
         when(pulsar.getConfiguration()).thenReturn(serviceConfiguration);
         when(pulsar.getConfig()).thenReturn(serviceConfiguration);
         when(pulsar.getTransactionExecutorProvider()).thenReturn(executorProvider);
-        when(pulsar.getTransactionBufferSnapshotService()).thenReturn(transactionBufferSnapshotService);
+        when(pulsar.getTransactionBufferSnapshotServiceFactory()).thenReturn(transactionBufferSnapshotServiceFactory);
         TopicTransactionBufferProvider topicTransactionBufferProvider = new TopicTransactionBufferProvider();
         when(pulsar.getTransactionBufferProvider()).thenReturn(topicTransactionBufferProvider);
         // Mock BacklogQuotaManager
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java
index b60350e8e39..7093665ade3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventType.java
@@ -31,5 +31,15 @@ public enum EventType {
     /**
      * Transaction buffer snapshot events.
      */
-    TRANSACTION_BUFFER_SNAPSHOT
+    TRANSACTION_BUFFER_SNAPSHOT,
+
+    /**
+     * Transaction buffer snapshot segment events.
+     */
+    TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
+
+    /**
+     * Transaction buffer snapshot indexes events.
+     */
+    TRANSACTION_BUFFER_SNAPSHOT_INDEXES
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
index eaab8261460..e3b7b2cf05d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java
@@ -37,6 +37,15 @@ public class SystemTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
+    /**
+     * Local topic name for the transaction buffer snapshot segments.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS = "__transaction_buffer_snapshot_segments";
+
+    /**
+     * Local topic name for the transaction buffer snapshot indexes.
+     */
+    public static final String TRANSACTION_BUFFER_SNAPSHOT_INDEXES = "__transaction_buffer_snapshot_indexes";
 
     public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";
 
@@ -46,7 +55,8 @@ public class SystemTopicNames {
      * The set of all local topic names declared above.
      */
     public static final Set<String> EVENTS_TOPIC_NAMES =
-            Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT));
+            Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT,
+                    TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
 
 
     public static final TopicName TRANSACTION_COORDINATOR_ASSIGN = TopicName.get(TopicDomain.persistent.value(),