You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/02 12:55:28 UTC

[pulsar] branch master updated: [Transaction][buffer] Add basic operation of transaction (#4738)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e7195eb  [Transaction][buffer] Add basic operation of transaction (#4738)
e7195eb is described below

commit e7195ebf5c0bcc7a3dda1446e78f2284e27d74bf
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Fri Aug 2 20:55:22 2019 +0800

    [Transaction][buffer] Add basic operation of transaction (#4738)
    
    
    
    *Modifications*
    
    Add primary operation of transaction. Keep all actions persistently.
    
    Describe the modifications you've done.
    
    - add commit operation
    - add abort operation
    - add openreader operation
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |   2 +-
 .../mledger/impl/ReadOnlyManagedLedgerImpl.java    |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 pulsar-transaction/buffer/pom.xml                  |  44 ++
 .../transaction/buffer/TransactionBuffer.java      |  14 +-
 .../transaction/buffer/TransactionCursor.java      |  73 ++
 .../pulsar/transaction/buffer/TransactionMeta.java |  58 ++
 .../NoTxnsCommittedAtLedgerException.java}         |  35 +-
 .../buffer/impl/InMemTransactionBuffer.java        |  50 +-
 .../buffer/impl/PersistentTransactionBuffer.java   | 268 ++++++++
 .../impl/PersistentTransactionBufferReader.java    | 134 ++++
 .../buffer/impl/TransactionCursorImpl.java         | 126 ++++
 .../buffer/impl/TransactionMetaImpl.java           | 163 +++++
 .../impl/PersistentTransactionBufferTest.java      | 732 +++++++++++++++++++++
 .../buffer/impl/TransactionBufferTest.java         |   2 +-
 15 files changed, 1663 insertions(+), 42 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 78d1816..0555936 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1465,7 +1465,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
-    void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
+    public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
         LedgerHandle currentLedger = this.currentLedger;
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", name, position.getLedgerId(), position.getEntryId());
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 36376b8..9721b15 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -142,7 +142,7 @@ public class ReadOnlyManagedLedgerImpl extends ManagedLedgerImpl {
     }
 
     @Override
-    void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
+    public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
             this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger) -> {
                 asyncReadEntry(ledger, position, callback, ctx);
             }).exceptionally((ex) -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 2a51378..a14e296 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -125,7 +125,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {
 
     // Managed ledger associated with the topic
-    private final ManagedLedger ledger;
+    protected final ManagedLedger ledger;
 
     // Subscriptions to this topic
     private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
diff --git a/pulsar-transaction/buffer/pom.xml b/pulsar-transaction/buffer/pom.xml
index b13cf43..9605ec1 100644
--- a/pulsar-transaction/buffer/pom.xml
+++ b/pulsar-transaction/buffer/pom.xml
@@ -52,6 +52,50 @@
             <artifactId>pulsar-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-broker</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>managed-ledger-original</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>managed-ledger-original</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.xolstice.maven.plugins</groupId>
+                <artifactId>protobuf-maven-plugin</artifactId>
+                <version>${protobuf-maven-plugin.version}</version>
+                <configuration>
+                    <protocArtifact>com.google.protobuf:protoc:${protoc3.version}:exe:${os.detected.classifier}</protocArtifact>
+                    <checkStaleness>true</checkStaleness>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>test-compile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java
index 7d0bcc4..f106f1c 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionBuffer.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.transaction.impl.common.TxnID;
  * commits the buffer again.
  */
 @Beta
-public interface TransactionBuffer extends AutoCloseable {
+public interface TransactionBuffer {
 
     /**
      * Return the metadata of a transaction in the buffer.
@@ -95,9 +95,7 @@ public interface TransactionBuffer extends AutoCloseable {
      * @throws org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException if the transaction
      *         is not in the buffer.
      */
-    CompletableFuture<Void> commitTxn(TxnID txnID,
-                                      long committedAtLedgerId,
-                                      long committedAtEntryId);
+    CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId);
 
     /**
      * Abort the transaction and all the entries of this transaction will
@@ -123,9 +121,9 @@ public interface TransactionBuffer extends AutoCloseable {
     CompletableFuture<Void> purgeTxns(List<Long> dataLedgers);
 
     /**
-     * {@inheritDoc}
+     * Close the buffer asynchronously.
+     *
+     * @return
      */
-    @Override
-    void close();
-
+    CompletableFuture<Void> closeAsync();
 }
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionCursor.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionCursor.java
new file mode 100644
index 0000000..a7dc070
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionCursor.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+/**
+ * The transaction Cursor maintains the index of all transactions.
+ */
+public interface TransactionCursor {
+    /**
+     * Get the specified transaction meta.
+     *
+     * @param txnID
+     * @param createIfNotExist
+     * @return
+     */
+    CompletableFuture<TransactionMeta> getTxnMeta(TxnID txnID, boolean createIfNotExist);
+
+    /**
+     * Commit transaction.
+     *
+     * @param committedLedgerId the ledger which  txn committed at.
+     * @param committedEntryId  the entry which txn committed at.
+     * @param txnID             the id which txn committed.
+     * @param position          the commit position at transaction log.
+     * @return
+     */
+    CompletableFuture<Void> commitTxn(long committedLedgerId, long committedEntryId, TxnID txnID, Position position);
+
+    /**
+     * Abort transaction.
+     *
+     * @param txnID aborted transaction id.
+     * @return
+     */
+    CompletableFuture<Void> abortTxn(TxnID txnID);
+
+    /**
+     * Get all the transaction id on the specified ledger.
+     *
+     * @param ledgerId the transaction committed ledger id
+     * @return
+     */
+    CompletableFuture<Set<TxnID>> getAllTxnsCommittedAtLedger(long ledgerId);
+
+    /**
+     * Remove the transactions on the specified ledger.
+     *
+     * @param ledgerId the remove transaction id
+     * @return
+     */
+    CompletableFuture<Void> removeTxnsCommittedAtLedger(long ledgerId);
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
index 8771c2b..2b929d2 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
@@ -19,6 +19,9 @@
 package org.apache.pulsar.transaction.buffer;
 
 import com.google.common.annotations.Beta;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.transaction.impl.common.TxnID;
 import org.apache.pulsar.transaction.impl.common.TxnStatus;
 
@@ -49,4 +52,59 @@ public interface TransactionMeta {
      */
     int numEntries();
 
+    /**
+     * Return the committed ledger id at data ledger.
+     *
+     * @return the committed ledger id
+     */
+    long committedAtLedgerId();
+
+    /**
+     * Return the committed entry id at data ledger.
+     *
+     * @return the committed entry id
+     */
+    long committedAtEntryId();
+
+    /**
+     * Return the last sequence id.
+     *
+     * @return the last sequence id
+     */
+    long lastSequenceId();
+
+    /**
+     * Read the entries from start sequence id.
+     *
+     * @param num the entries number need to read
+     * @param startSequenceId the start position of the entries
+     * @return
+     */
+    CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId);
+
+    /**
+     * Add transaction entry into the transaction.
+     *
+     * @param sequenceId the message sequence id
+     * @param position the position of transaction log
+     * @return
+     */
+    CompletableFuture<Void> appendEntry(long sequenceId, Position position);
+
+    /**
+     * Mark the transaction is committed.
+     *
+     * @param committedAtLedgerId
+     * @param committedAtEntryId
+     * @return
+     */
+    CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId);
+
+    /**
+     * Mark the transaction is aborted.
+     *
+     * @return
+     */
+    CompletableFuture<TransactionMeta> abortTxn();
+
 }
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
similarity index 53%
copy from pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
copy to pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
index 8771c2b..74f950a 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/TransactionMeta.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/exceptions/NoTxnsCommittedAtLedgerException.java
@@ -16,37 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.buffer;
-
-import com.google.common.annotations.Beta;
-import org.apache.pulsar.transaction.impl.common.TxnID;
-import org.apache.pulsar.transaction.impl.common.TxnStatus;
+package org.apache.pulsar.transaction.buffer.exceptions;
 
 /**
- * The metadata for the transaction in the transaction buffer.
+ * Exception is thrown when no transactions found committed at a given ledger.
  */
-@Beta
-public interface TransactionMeta {
-
-    /**
-     * Returns the transaction id.
-     *
-     * @return the transaction id
-     */
-    TxnID id();
-
-    /**
-     * Return the status of the transaction.
-     *
-     * @return the status of the transaction
-     */
-    TxnStatus status();
+public class NoTxnsCommittedAtLedgerException extends TransactionBufferException {
 
-    /**
-     * Return the number of entries appended to the transaction.
-     *
-     * @return the number of entries
-     */
-    int numEntries();
+    private static final long serialVersionUID = 0L;
 
+    public NoTxnsCommittedAtLedgerException(String message) {
+        super(message);
+    }
 }
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java
index 93a3e2e..1ce8234 100644
--- a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/InMemTransactionBuffer.java
@@ -30,6 +30,8 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.buffer.TransactionBuffer;
 import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
 import org.apache.pulsar.transaction.buffer.TransactionMeta;
@@ -79,6 +81,49 @@ class InMemTransactionBuffer implements TransactionBuffer {
             }
         }
 
+        @Override
+        public long committedAtLedgerId() {
+            return committedAtLedgerId;
+        }
+
+        @Override
+        public long committedAtEntryId() {
+            return committedAtEntryId;
+        }
+
+        @Override
+        public long lastSequenceId() {
+            return entries.lastKey();
+        }
+
+        @Override
+        public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
+            return FutureUtil.failedFuture(new UnsupportedOperationException());
+        }
+
+        @Override
+        public CompletableFuture<Void> appendEntry(long sequenceId, Position position) {
+            return FutureUtil.failedFuture(new UnsupportedOperationException());
+        }
+
+        @Override
+        public CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId, long committedAtEntryId) {
+            try {
+                return CompletableFuture.completedFuture(commitAt(committedAtLedgerId, committedAtEntryId));
+            } catch (UnexpectedTxnStatusException e) {
+                return FutureUtil.failedFuture(e);
+            }
+        }
+
+        @Override
+        public CompletableFuture<TransactionMeta> abortTxn() {
+            try {
+                return CompletableFuture.completedFuture(abort());
+            } catch (UnexpectedTxnStatusException e) {
+                return FutureUtil.failedFuture(e);
+            }
+        }
+
         synchronized TxnBuffer abort() throws UnexpectedTxnStatusException {
             if (TxnStatus.OPEN != status) {
                 throw new UnexpectedTxnStatusException(txnid, TxnStatus.OPEN, status);
@@ -150,7 +195,6 @@ class InMemTransactionBuffer implements TransactionBuffer {
 
     final ConcurrentMap<TxnID, TxnBuffer> buffers;
     final Map<Long, Set<TxnID>> txnIndex;
-
     public InMemTransactionBuffer() {
         this.buffers = new ConcurrentHashMap<>();
         this.txnIndex = new HashMap<>();
@@ -285,7 +329,9 @@ class InMemTransactionBuffer implements TransactionBuffer {
     }
 
     @Override
-    public void close() {
+    public CompletableFuture<Void> closeAsync() {
         buffers.values().forEach(TxnBuffer::close);
+        return CompletableFuture.completedFuture(null);
     }
+
 }
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
new file mode 100644
index 0000000..f0513cd
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBuffer.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.api.proto.PulsarMarkers.MessageIdData;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.transaction.buffer.TransactionCursor;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+/**
+ * A persistent transaction buffer implementation.
+ */
+@Slf4j
+public class PersistentTransactionBuffer extends PersistentTopic implements TransactionBuffer {
+
+    private TransactionCursor txnCursor;
+    private ManagedCursor retentionCursor;
+
+
+    abstract static class TxnCtx implements PublishContext {
+        private final long sequenceId;
+        private final CompletableFuture<Position> completableFuture;
+        private final String producerName;
+
+        TxnCtx(String producerName, long sequenceId, CompletableFuture<Position> future) {
+            this.sequenceId = sequenceId;
+            this.completableFuture = future;
+            this.producerName = producerName;
+        }
+
+
+
+        @Override
+        public String getProducerName() {
+            return this.producerName;
+        }
+
+        @Override
+        public long getSequenceId() {
+            return this.sequenceId;
+        }
+    }
+
+    public PersistentTransactionBuffer(String topic, ManagedLedger ledger, BrokerService brokerService)
+        throws BrokerServiceException.NamingException, ManagedLedgerException {
+        super(topic, ledger, brokerService);
+        this.txnCursor = new TransactionCursorImpl();
+        this.retentionCursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
+        return txnCursor.getTxnMeta(txnID, false);
+    }
+
+    @Override
+    public CompletableFuture<Void> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
+        return publishMessage(txnId, buffer, sequenceId).thenCompose(position -> appendBuffer(txnId, position,
+                                                                                             sequenceId));
+    }
+
+    private CompletableFuture<Void> appendBuffer(TxnID txnID, Position position, long sequenceId) {
+        return txnCursor.getTxnMeta(txnID, true).thenCompose(meta -> meta.appendEntry(sequenceId, position));
+    }
+
+    @Override
+    public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) {
+        return txnCursor.getTxnMeta(txnID, false).thenCompose(this::createNewReader);
+    }
+
+    private CompletableFuture<TransactionBufferReader> createNewReader(TransactionMeta meta) {
+        CompletableFuture<TransactionBufferReader> createReaderFuture = new CompletableFuture<>();
+
+        try {
+            PersistentTransactionBufferReader reader = new PersistentTransactionBufferReader(meta, ledger);
+            createReaderFuture.complete(reader);
+        } catch (TransactionNotSealedException e) {
+            createReaderFuture.completeExceptionally(e);
+        }
+
+        return createReaderFuture;
+    }
+
+    @Builder
+    final static class Marker {
+        long sequenceId;
+        ByteBuf marker;
+    }
+
+    @Override
+    public CompletableFuture<Void> commitTxn(TxnID txnID, long committedAtLedgerId, long committedAtEntryId) {
+        return txnCursor.getTxnMeta(txnID, false)
+                        .thenApply(meta -> createCommitMarker(meta, committedAtLedgerId, committedAtEntryId))
+                        .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId))
+                        .thenCompose(position -> txnCursor.commitTxn(committedAtLedgerId, committedAtEntryId, txnID,
+                                                                     position));
+    }
+
+    private Marker createCommitMarker(TransactionMeta meta, long committedAtLedgerId, long committedAtEntryId) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} create a commit marker", meta.id());
+        }
+        long sequenceId = meta.lastSequenceId() + 1;
+        MessageIdData messageIdData = MessageIdData.newBuilder()
+                                                   .setLedgerId(committedAtLedgerId)
+                                                   .setEntryId(committedAtEntryId)
+                                                   .build();
+        ByteBuf commitMarker = Markers.newTxnCommitMarker(sequenceId, meta.id().getMostSigBits(),
+                                                          meta.id().getLeastSigBits(), messageIdData);
+        Marker marker = Marker.builder().sequenceId(sequenceId).marker(commitMarker).build();
+        return marker;
+    }
+
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnID) {
+        return txnCursor.getTxnMeta(txnID, false)
+                        .thenApply(meta -> createAbortMarker(meta))
+                        .thenCompose(marker -> publishMessage(txnID, marker.marker, marker.sequenceId))
+                        .thenCompose(position -> txnCursor.abortTxn(txnID));
+    }
+
+    private Marker createAbortMarker(TransactionMeta meta) {
+        if (log.isDebugEnabled()) {
+            log.debug("Transaction {} create a abort marker", meta.id());
+        }
+        long sequenceId = meta.lastSequenceId() + 1;
+        ByteBuf abortMarker = Markers.newTxnAbortMarker(sequenceId, meta.id().getMostSigBits(),
+                                                        meta.id().getLeastSigBits());
+        Marker marker = Marker.builder().sequenceId(sequenceId).marker(abortMarker).build();
+        return marker;
+    }
+
+
+    private CompletableFuture<Position> publishMessage(TxnID txnID, ByteBuf msg, long sequenceId) {
+        CompletableFuture<Position> publishFuture = new CompletableFuture<>();
+        publishMessage(msg, new TxnCtx(txnID.toString(), sequenceId, publishFuture) {
+            @Override
+            public void completed(Exception e, long ledgerId, long entryId) {
+                if (e != null) {
+                    publishFuture.completeExceptionally(e);
+                } else {
+                    publishFuture.complete(PositionImpl.get(ledgerId, entryId));
+                }
+            }
+        });
+        return publishFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) {
+        if (log.isDebugEnabled()) {
+            log.debug("Begin to purge the ledgers {}", dataLedgers);
+        }
+
+        List<CompletableFuture<Void>> futures = dataLedgers.stream().map(dataLedger -> cleanTxnsOnLedger(dataLedger))
+                                                           .collect(Collectors.toList());
+        return FutureUtil.waitForAll(futures).thenCompose(v -> removeCommittedLedgerFromIndex(dataLedgers));
+    }
+
+    private CompletableFuture<Void> removeCommittedLedgerFromIndex(List<Long> dataLedgers) {
+        List<CompletableFuture<Void>> removeFutures = dataLedgers.stream().map(
+            dataLedger -> txnCursor.removeTxnsCommittedAtLedger(dataLedger)).collect(Collectors.toList());
+        return FutureUtil.waitForAll(removeFutures);
+    }
+
+    private CompletableFuture<Void> cleanTxnsOnLedger(long dataledger) {
+        if (log.isDebugEnabled()) {
+            log.debug("Start to clean ledger {}", dataledger);
+        }
+        return txnCursor.getAllTxnsCommittedAtLedger(dataledger).thenCompose(txnIDS -> deleteTxns(txnIDS));
+    }
+
+    private CompletableFuture<Void> deleteTxns(Set<TxnID> txnIDS) {
+        if (log.isDebugEnabled()) {
+            log.debug("Start delete txns {} under ledger", txnIDS);
+        }
+        List<CompletableFuture<Void>> futures = txnIDS.stream().map(txnID -> deleteTxn(txnID))
+                                                      .collect(Collectors.toList());
+        return FutureUtil.waitForAll(futures);
+    }
+
+    private CompletableFuture<Void> deleteTxn(TxnID txnID) {
+        if (log.isDebugEnabled()) {
+            log.debug("Start to delete txn {} entries", txnID);
+        }
+        return txnCursor.getTxnMeta(txnID, false)
+                 .thenCompose(meta -> meta.readEntries(meta.numEntries(), -1L))
+                 .thenCompose(longPositionSortedMap -> deleteEntries(longPositionSortedMap, txnID));
+    }
+
+    private CompletableFuture<Void> deleteEntries(SortedMap<Long, Position> entriesMap, TxnID txnID) {
+        if (log.isDebugEnabled()) {
+            log.debug("Delete entries {}", entriesMap);
+        }
+        List<CompletableFuture<Void>> deleteFutures = entriesMap.values().stream()
+            .map(position -> asyncDeletePosition(position, txnID))
+            .collect(Collectors.toList());
+
+        return FutureUtil.waitForAll(deleteFutures);
+    }
+
+    private CompletableFuture<Void> asyncDeletePosition(Position position, TxnID txnID) {
+        if (log.isDebugEnabled()) {
+            log.debug("Ready to delete position {} for txn {}", position, txnID);
+        }
+        CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+        retentionCursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Success delete transaction `{}` entry on position {}", txnID, position);
+                }
+                deleteFuture.complete(null);
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                log.error("Failed delete transaction `{}` entry on position {}", txnID, position, exception);
+                deleteFuture.completeExceptionally(exception);
+            }
+        }, null);
+
+        return deleteFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return FutureUtil.failedFuture(new UnsupportedOperationException());
+    }
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferReader.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferReader.java
new file mode 100644
index 0000000..4806c9e
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferReader.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.transaction.buffer.TransactionEntry;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+/**
+ * A persistent transaction buffer reader implementation.
+ */
+@Slf4j
+public class PersistentTransactionBufferReader implements TransactionBufferReader {
+
+    static final long DEFAULT_START_SEQUENCE_ID = -1L;
+
+    private final ManagedLedger ledger;
+    private final TransactionMeta meta;
+    private volatile long currentSequenceId = DEFAULT_START_SEQUENCE_ID;
+
+
+    PersistentTransactionBufferReader(TransactionMeta meta, ManagedLedger ledger)
+        throws TransactionNotSealedException {
+        if (TxnStatus.OPEN == meta.status()) {
+            throw new TransactionNotSealedException("Transaction `" + meta.id() + "` is not sealed yet");
+        }
+        this.meta = meta;
+        this.ledger = ledger;
+    }
+
+    @Override
+    public CompletableFuture<List<TransactionEntry>> readNext(int numEntries) {
+        return meta.readEntries(numEntries, currentSequenceId)
+                   .thenCompose(this::readEntry)
+                   .thenApply(entries -> entries.stream()
+                                                .sorted(Comparator.comparingLong(entry -> entry.sequenceId()))
+                                                .collect(Collectors.toList()));
+    }
+
+    private CompletableFuture<List<TransactionEntry>> readEntry(SortedMap<Long, Position> entries) {
+        CompletableFuture<List<TransactionEntry>> readFuture = new CompletableFuture<>();
+        List<TransactionEntry> txnEntries = new ArrayList<>(entries.size());
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        for (Map.Entry<Long, Position> longPositionEntry : entries.entrySet()) {
+            CompletableFuture<Void> tmpFuture = new CompletableFuture<>();
+            readEntry(longPositionEntry.getValue()).whenComplete((entry, throwable) -> {
+                if (null != throwable) {
+                    tmpFuture.completeExceptionally(throwable);
+                } else {
+                    TransactionEntry txnEntry = new TransactionEntryImpl(meta.id(), longPositionEntry.getKey(),
+                                                                         entry.getDataBuffer(),
+                                                                         meta.committedAtLedgerId(),
+                                                                         meta.committedAtEntryId());
+                    synchronized (txnEntries) {
+                        txnEntries.add(txnEntry);
+                    }
+                    tmpFuture.complete(null);
+                }
+            });
+            futures.add(tmpFuture);
+        }
+
+        FutureUtil.waitForAll(futures).whenComplete((ignore, error) -> {
+            if (error != null) {
+                readFuture.completeExceptionally(error);
+            } else {
+                currentSequenceId = entries.lastKey();
+                readFuture.complete(txnEntries);
+            }
+        });
+
+        return readFuture;
+    }
+
+    private CompletableFuture<Entry> readEntry(Position position) {
+        CompletableFuture<Entry> readFuture = new CompletableFuture<>();
+
+        ManagedLedgerImpl readLedger = (ManagedLedgerImpl) ledger;
+
+        readLedger.asyncReadEntry((PositionImpl) position, new AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                readFuture.complete(entry);
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                readFuture.completeExceptionally(exception);
+            }
+        }, null);
+
+        return readFuture;
+    }
+
+    @Override
+    public void close() {
+        log.info("Txn {} reader closed.", meta.id());
+    }
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
new file mode 100644
index 0000000..65431fa
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionCursorImpl.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.buffer.TransactionCursor;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+
+public class TransactionCursorImpl implements TransactionCursor {
+
+
+    private final ConcurrentMap<TxnID, TransactionMetaImpl> txnIndex;
+    private final Map<Long, Set<TxnID>> committedLedgerTxnIndex;
+
+    TransactionCursorImpl() {
+        this.txnIndex = new ConcurrentHashMap<>();
+        this.committedLedgerTxnIndex = new TreeMap<>();
+    }
+
+    @Override
+    public CompletableFuture<TransactionMeta> getTxnMeta(TxnID txnID, boolean createIfNotExist) {
+        CompletableFuture<TransactionMeta> getFuture = new CompletableFuture<>();
+        TransactionMeta meta = txnIndex.get(txnID);
+        if (null == meta) {
+            if (!createIfNotExist) {
+                getFuture.completeExceptionally(
+                    new TransactionNotFoundException("Transaction `" + txnID + "` doesn't" + " exist"));
+                return getFuture;
+            }
+
+            TransactionMetaImpl newMeta = new TransactionMetaImpl(txnID);
+            TransactionMeta oldMeta = txnIndex.putIfAbsent(txnID, newMeta);
+            if (null != oldMeta) {
+                meta = oldMeta;
+            } else {
+                meta = newMeta;
+            }
+        }
+        getFuture.complete(meta);
+
+        return getFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> commitTxn(long committedLedgerId, long committedEntryId, TxnID txnID,
+                                             Position position) {
+        return getTxnMeta(txnID, false)
+            .thenCompose(meta -> meta.commitTxn(committedLedgerId, committedEntryId))
+            .thenAccept(meta -> addTxnToCommittedIndex(txnID, committedLedgerId));
+    }
+
+    private void addTxnToCommittedIndex(TxnID txnID, long committedAtLedgerId) {
+        synchronized (committedLedgerTxnIndex) {
+            committedLedgerTxnIndex.computeIfAbsent(committedAtLedgerId, ledgerId -> new HashSet<>()).add(txnID);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> abortTxn(TxnID txnID) {
+        return getTxnMeta(txnID, false)
+            .thenCompose(meta -> meta.abortTxn())
+            .thenApply(meta -> null);
+    }
+
+    public CompletableFuture<Set<TxnID>> getAllTxnsCommittedAtLedger(long ledgerId) {
+        CompletableFuture<Set<TxnID>> removeFuture = new CompletableFuture<>();
+
+        Set<TxnID> txnIDS = committedLedgerTxnIndex.get(ledgerId);
+
+        if (null == txnIDS) {
+            removeFuture.completeExceptionally(new NoTxnsCommittedAtLedgerException(
+                "Transaction committed ledger id `" + ledgerId + "` doesn't exist") {
+            });
+            return removeFuture;
+        }
+
+        removeFuture.complete(txnIDS);
+        return removeFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> removeTxnsCommittedAtLedger(long ledgerId) {
+        CompletableFuture<Void> removeFuture = new CompletableFuture<>();
+
+        synchronized (committedLedgerTxnIndex) {
+            Set<TxnID> txnIDS = committedLedgerTxnIndex.remove(ledgerId);
+            if (null == txnIDS) {
+                removeFuture.completeExceptionally(new NoTxnsCommittedAtLedgerException(
+                    "Transaction committed ledger id `" + ledgerId + "` doesn't exist"));
+            } else {
+                txnIDS.forEach(txnID -> {
+                    txnIndex.remove(txnID);
+                });
+                removeFuture.complete(null);
+            }
+        }
+
+        return removeFuture;
+    }
+}
diff --git a/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionMetaImpl.java b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionMetaImpl.java
new file mode 100644
index 0000000..9325d04
--- /dev/null
+++ b/pulsar-transaction/buffer/src/main/java/org/apache/pulsar/transaction/buffer/impl/TransactionMetaImpl.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionSealedException;
+import org.apache.pulsar.transaction.buffer.exceptions.UnexpectedTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+
+public class TransactionMetaImpl implements TransactionMeta {
+
+    private final TxnID txnID;
+    private SortedMap<Long, Position> entries;
+    private TxnStatus txnStatus;
+    private long committedAtLedgerId = -1L;
+    private long committedAtEntryId = -1L;
+
+    TransactionMetaImpl(TxnID txnID) {
+        this.txnID = txnID;
+        this.entries = new TreeMap<>();
+        this.txnStatus = TxnStatus.OPEN;
+    }
+
+    @Override
+    public TxnID id() {
+        return this.txnID;
+    }
+
+    @Override
+    public synchronized TxnStatus status() {
+        return this.txnStatus;
+    }
+
+    @Override
+    public int numEntries() {
+        synchronized (entries) {
+            return entries.size();
+        }
+    }
+
+    @VisibleForTesting
+    public SortedMap<Long, Position> getEntries() {
+        return entries;
+    }
+
+    @Override
+    public long committedAtLedgerId() {
+        return committedAtLedgerId;
+    }
+
+    @Override
+    public long committedAtEntryId() {
+        return committedAtEntryId;
+    }
+
+    @Override
+    public long lastSequenceId() {
+        return entries.lastKey();
+    }
+
+    @Override
+    public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long startSequenceId) {
+        CompletableFuture<SortedMap<Long, Position>> readFuture = new CompletableFuture<>();
+
+        SortedMap<Long, Position> result = new TreeMap<>();
+
+        SortedMap<Long, Position> readEntries = entries;
+        if (startSequenceId != PersistentTransactionBufferReader.DEFAULT_START_SEQUENCE_ID) {
+            readEntries = entries.tailMap(startSequenceId);
+        }
+
+        if (readEntries.isEmpty()) {
+            readFuture.completeExceptionally(
+                new EndOfTransactionException("No more entries found in transaction `" + txnID + "`"));
+            return readFuture;
+        }
+
+        for (Map.Entry<Long, Position> longPositionEntry : readEntries.entrySet()) {
+            result.put(longPositionEntry.getKey(), longPositionEntry.getValue());
+        }
+
+        readFuture.complete(result);
+
+        return readFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> appendEntry(long sequenceId, Position position) {
+        CompletableFuture<Void> appendFuture = new CompletableFuture<>();
+        synchronized (this) {
+            if (TxnStatus.OPEN != txnStatus) {
+                appendFuture.completeExceptionally(
+                    new TransactionSealedException("Transaction `" + txnID + "` is " + "already sealed"));
+                return appendFuture;
+            }
+        }
+        synchronized (this.entries) {
+            this.entries.put(sequenceId, position);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public synchronized CompletableFuture<TransactionMeta> commitTxn(long committedAtLedgerId,
+                                                                     long committedAtEntryId) {
+        CompletableFuture<TransactionMeta> commitFuture = new CompletableFuture<>();
+        if (!checkOpened(txnID, commitFuture)) {
+            return commitFuture;
+        }
+
+        this.committedAtLedgerId = committedAtLedgerId;
+        this.committedAtEntryId = committedAtEntryId;
+        this.txnStatus = TxnStatus.COMMITTED;
+        TransactionMeta meta = this;
+        commitFuture.complete(meta);
+        return commitFuture;
+    }
+
+    @Override
+    public synchronized CompletableFuture<TransactionMeta> abortTxn() {
+        CompletableFuture<TransactionMeta> abortFuture = new CompletableFuture<>();
+        if (!checkOpened(txnID, abortFuture)) {
+            return abortFuture;
+        }
+
+        this.txnStatus = TxnStatus.ABORTED;
+        abortFuture.complete(this);
+
+        return abortFuture;
+    }
+
+    private boolean checkOpened(TxnID txnID, CompletableFuture<TransactionMeta> future) {
+        if (TxnStatus.OPEN != txnStatus) {
+            future.completeExceptionally(new UnexpectedTxnStatusException(txnID, TxnStatus.OPEN, txnStatus));
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
new file mode 100644
index 0000000..f77f50c
--- /dev/null
+++ b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/PersistentTransactionBufferTest.java
@@ -0,0 +1,732 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.transaction.buffer.impl;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.endsWith;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import lombok.Cleanup;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.transaction.buffer.TransactionBufferReader;
+import org.apache.pulsar.transaction.buffer.TransactionEntry;
+import org.apache.pulsar.transaction.buffer.TransactionMeta;
+import org.apache.pulsar.transaction.buffer.exceptions.EndOfTransactionException;
+import org.apache.pulsar.transaction.buffer.exceptions.NoTxnsCommittedAtLedgerException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionBufferException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotFoundException;
+import org.apache.pulsar.transaction.buffer.exceptions.TransactionNotSealedException;
+import org.apache.pulsar.transaction.buffer.exceptions.UnexpectedTxnStatusException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.transaction.impl.common.TxnStatus;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.txn.Txn;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PersistentTransactionBufferTest extends MockedBookKeeperTestCase {
+    private PulsarService pulsar;
+    private BrokerService brokerService;
+    private ManagedLedgerFactory mlFactoryMock;
+    private ServerCnx serverCnx;
+    private ManagedLedger ledgerMock;
+    private ManagedCursor cursorMock;
+    private ConfigurationCacheService configCacheService;
+
+    final String successTopicName = "persistent://prop/use/ns-abc/successTopic_txn";
+    private static final Logger log = LoggerFactory.getLogger(PersistentTransactionBufferTest.class);
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        pulsar = spy(new PulsarService(svcConfig));
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+        doReturn(mock(Compactor.class)).when(pulsar).getCompactor();
+
+        mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+
+        ZooKeeper mockZk = createMockZooKeeper();
+        doReturn(mockZk).when(pulsar).getZkClient();
+        doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
+            .when(pulsar).getBookKeeperClient();
+
+        ZooKeeperCache cache = mock(ZooKeeperCache.class);
+        doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+        doReturn(cache).when(pulsar).getLocalZkCache();
+
+        configCacheService = mock(ConfigurationCacheService.class);
+        @SuppressWarnings("unchecked")
+        ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
+        doReturn(zkDataCache).when(configCacheService).policiesCache();
+        doReturn(configCacheService).when(pulsar).getConfigurationCache();
+        doReturn(Optional.empty()).when(zkDataCache).get(anyString());
+
+        LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
+        doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
+        doReturn(zkDataCache).when(zkCache).policiesCache();
+        doReturn(configCacheService).when(pulsar).getConfigurationCache();
+        doReturn(zkCache).when(pulsar).getLocalZkCacheService();
+
+        brokerService = spy(new BrokerService(pulsar));
+        doReturn(brokerService).when(pulsar).getBrokerService();
+
+        serverCnx = spy(new ServerCnx(pulsar));
+        doReturn(true).when(serverCnx).isActive();
+        doReturn(true).when(serverCnx).isWritable();
+        doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+
+        NamespaceService nsSvc = mock(NamespaceService.class);
+        doReturn(nsSvc).when(pulsar).getNamespaceService();
+        doReturn(true).when(nsSvc).isServiceUnitOwned(any(NamespaceBundle.class));
+        doReturn(true).when(nsSvc).isServiceUnitActive(any(TopicName.class));
+
+        setupMLAsyncCallbackMocks();
+    }
+
+    public static MockZooKeeper createMockZooKeeper() throws Exception {
+        MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+        List<ACL> dummyAclList = new ArrayList<>(0);
+
+        ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+                                         "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, CreateMode.PERSISTENT);
+
+        zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList,
+                  CreateMode.PERSISTENT);
+        return zk;
+    }
+
+    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper,
+                                                                 ExecutorService executor) throws Exception {
+        return spy(new NonClosableMockBookKeeper(zookeeper, executor));
+    }
+
+    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+        public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception {
+            super(zk, executor);
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+
+        @Override
+        public void shutdown() {
+            // no-op
+        }
+
+        public void reallyShutdown() {
+            super.shutdown();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    void setupMLAsyncCallbackMocks()
+        throws BrokerServiceException.NamingException, ManagedLedgerException, InterruptedException {
+        ledgerMock = mock(ManagedLedger.class);
+        cursorMock = mock(ManagedCursor.class);
+        final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn("mockCursor").when(cursorMock).getName();
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                return closeFuture.complete(null);
+            }
+        })
+
+            .when(cursorMock).asyncClose(new CloseCallback() {
+
+            @Override
+            public void closeComplete(Object ctx) {
+                log.info("[{}] Successfully closed cursor ledger", "mockCursor");
+                closeFuture.complete(null);
+            }
+
+            @Override
+            public void closeFailed(ManagedLedgerException exception, Object ctx) {
+                // isFenced.set(false);
+
+                log.error("Error closing cursor for subscription", exception);
+                closeFuture.completeExceptionally(new BrokerServiceException.PersistenceException(exception));
+            }
+        }, null);
+
+        // call openLedgerComplete with ledgerMock on ML factory asyncOpen
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((OpenLedgerCallback) invocationOnMock.getArguments()[2]).openLedgerComplete(ledgerMock, null);
+                return null;
+            }
+        }).when(mlFactoryMock)
+          .asyncOpen(matches(".*success.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject());
+
+        // call openLedgerFailed on ML factory asyncOpen
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((OpenLedgerCallback) invocationOnMock.getArguments()[2])
+                    .openLedgerFailed(new ManagedLedgerException("Managed ledger failure"), null);
+                return null;
+            }
+        }).when(mlFactoryMock)
+          .asyncOpen(matches(".*fail.*"), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject());
+
+        // call addComplete on ledger asyncAddEntry
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((AddEntryCallback) invocationOnMock.getArguments()[1])
+                    .addComplete(new PositionImpl(1, 1), invocationOnMock.getArguments()[2]);
+                return null;
+            }
+        }).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), anyObject());
+
+        // call openCursorComplete on cursor asyncOpen
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((OpenCursorCallback) invocationOnMock.getArguments()[2]).openCursorComplete(cursorMock, null);
+                return null;
+            }
+        }).when(ledgerMock)
+          .asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(OpenCursorCallback.class), anyObject());
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((OpenCursorCallback) invocationOnMock.getArguments()[3]).openCursorComplete(cursorMock, null);
+                return null;
+            }
+        }).when(ledgerMock).asyncOpenCursor(matches(".*success.*"), any(InitialPosition.class), any(Map.class),
+                                            any(OpenCursorCallback.class), anyObject());
+
+        // call deleteLedgerComplete on ledger asyncDelete
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((DeleteLedgerCallback) invocationOnMock.getArguments()[0]).deleteLedgerComplete(null);
+                return null;
+            }
+        }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), anyObject());
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+                ((DeleteCursorCallback) invocationOnMock.getArguments()[1]).deleteCursorComplete(null);
+                return null;
+            }
+        }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), anyObject());
+
+        doAnswer((invokactionOnMock) -> {
+            ((MarkDeleteCallback) invokactionOnMock.getArguments()[2])
+                .markDeleteComplete(invokactionOnMock.getArguments()[3]);
+            return null;
+        }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(MarkDeleteCallback.class), anyObject());
+
+        this.buffer = new PersistentTransactionBuffer(successTopicName, factory.open("hello"), brokerService);
+    }
+
+    @AfterMethod
+    public void teardown() throws Exception {
+        brokerService.getTopics().clear();
+        brokerService.close(); //to clear pulsarStats
+        try {
+            pulsar.close();
+        } catch (Exception e) {
+            log.warn("Failed to close pulsar service", e);
+            throw e;
+        }
+    }
+
+    private final TxnID txnID = new TxnID(1234L, 5678L);
+    private PersistentTransactionBuffer buffer;
+
+    @Test
+    public void testGetANonExistTxn() throws InterruptedException {
+        try {
+            buffer.getTransactionMeta(txnID).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TransactionNotFoundException);
+        }
+    }
+
+    @Test
+    public void testOpenReaderOnNonExistentTxn() throws InterruptedException{
+        try {
+            buffer.openTransactionBufferReader(txnID, 0L).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TransactionNotFoundException);
+        }
+    }
+
+    @Test
+    public void testOpenReadOnAnOpenTxn() throws InterruptedException {
+        final int numEntries = 10;
+        appendEntries(buffer, txnID, numEntries, 0L);
+        TransactionMeta meta = null;
+        try {
+            meta = buffer.getTransactionMeta(txnID).get();
+        } catch (ExecutionException e) {
+            fail("Should not failed at here");
+        }
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.OPEN, meta.status());
+
+        try {
+            buffer.openTransactionBufferReader(txnID, 0L).get();
+            fail("Should failed");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof TransactionNotSealedException);
+        }
+    }
+
+    @Test
+    public void testOpenReaderOnCommittedTxn() throws ExecutionException, InterruptedException {
+        final int numEntries = 10;
+        appendEntries(buffer, txnID, numEntries, 0L);
+        TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.OPEN, meta.status());
+
+        buffer.commitTxn(txnID, 22L, 33L).get();
+
+        meta = buffer.getTransactionMeta(txnID).get();
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.COMMITTED, meta.status());
+
+        try (TransactionBufferReader reader = buffer.openTransactionBufferReader(txnID, 0L).get()) {
+            List<TransactionEntry> entries = reader.readNext(numEntries).get();
+            verifyAndReleaseEntries(entries, txnID, 0L, numEntries);
+
+            reader.readNext(1).get();
+
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof EndOfTransactionException);
+        }
+
+    }
+
+    @Test
+    public void testCommitNonExistentTxn() throws ExecutionException, InterruptedException {
+        try {
+            buffer.commitTxn(txnID, 22L, 33L).get();
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+        }
+    }
+
+    @Test
+    public void testCommitTxn() throws Exception {
+        final int numEntries = 10;
+        appendEntries(buffer, txnID, numEntries, 0L);
+        TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+
+        assertEquals(txnID, meta.id());
+        assertEquals(meta.status(), TxnStatus.OPEN);
+
+        buffer.commitTxn(txnID, 22L, 33L).get();
+        meta = buffer.getTransactionMeta(txnID).get();
+
+        assertEquals(txnID, meta.id());
+        assertEquals(meta.status(), TxnStatus.COMMITTED);
+    }
+
+    @Test
+    public void testCommitTxnMultiTimes() throws ExecutionException, InterruptedException {
+        final int numEntries = 10;
+        appendEntries(buffer, txnID, numEntries, 0L);
+        TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+
+        assertEquals(txnID, meta.id());
+        assertEquals(meta.status(), TxnStatus.OPEN);
+
+        buffer.commitTxn(txnID, 22L, 33L).get();
+        try {
+            buffer.commitTxn(txnID, 23L, 34L).get();
+            buffer.commitTxn(txnID, 24L, 34L).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof UnexpectedTxnStatusException);
+        }
+        meta = buffer.getTransactionMeta(txnID).get();
+
+        assertEquals(txnID, meta.id());
+        assertEquals(meta.status(), TxnStatus.COMMITTED);
+        assertEquals(meta.committedAtLedgerId(), 22L);
+        assertEquals(meta.committedAtEntryId(), 33L);
+        assertEquals(meta.numEntries(), numEntries);
+    }
+
+    @Test
+    public void testAbortNonExistentTxn() throws Exception {
+        try {
+            buffer.abortTxn(txnID).get();
+            fail("Should fail to abort a transaction if it doesn't exist");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+        }
+    }
+
+    @Test
+    public void testAbortCommittedTxn() throws Exception {
+        final int numEntries = 10;
+        appendEntries(buffer, txnID, numEntries, 0L);
+        TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.OPEN, meta.status());
+
+        buffer.commitTxn(txnID, 22L, 33L).get();
+        meta = buffer.getTransactionMeta(txnID).get();
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.COMMITTED, meta.status());
+
+        try {
+            buffer.abortTxn(txnID).get();
+            fail("Should fail to abort a committed transaction");
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof UnexpectedTxnStatusException);
+        }
+
+        meta = buffer.getTransactionMeta(txnID).get();
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.COMMITTED, meta.status());
+    }
+
+    @Test
+    public void testAbortTxn() throws Exception {
+        final int numEntries = 10;
+        appendEntries(buffer, txnID, numEntries, 0L);
+        TransactionMeta meta = buffer.getTransactionMeta(txnID).get();
+        assertEquals(txnID, meta.id());
+        assertEquals(TxnStatus.OPEN, meta.status());
+
+        buffer.abortTxn(txnID).get();
+        verifyTxnNotExist(txnID);
+    }
+
+    @Test
+    public void testPurgeTxns() throws Exception {
+        final int numEntries = 10;
+        TxnID txnId1 = new TxnID(1234L, 2345L);
+        appendEntries(buffer, txnId1, numEntries, 0L);
+        TransactionMeta meta1 = buffer.getTransactionMeta(txnId1).get();
+        assertEquals(txnId1, meta1.id());
+        assertEquals(TxnStatus.OPEN, meta1.status());
+
+        TxnID txnId2 = new TxnID(1234L, 3456L);
+        appendEntries(buffer, txnId2, numEntries, 0L);
+        buffer.commitTxn(txnId2, 22L, 0L).get();
+        TransactionMeta meta2 = buffer.getTransactionMeta(txnId2).get();
+        assertEquals(txnId2, meta2.id());
+        assertEquals(TxnStatus.COMMITTED, meta2.status());
+
+        TxnID txnId3 = new TxnID(1234L, 4567L);
+        appendEntries(buffer, txnId3, numEntries, 0L);
+        buffer.commitTxn(txnId3, 23L, 0L).get();
+        TransactionMeta meta3 = buffer.getTransactionMeta(txnId3).get();
+        assertEquals(txnId3, meta3.id());
+        assertEquals(TxnStatus.COMMITTED, meta3.status());
+
+        buffer.purgeTxns(Lists.newArrayList(Long.valueOf(22L))).get();
+
+        verifyTxnNotExist(txnId2);
+
+        meta1 = buffer.getTransactionMeta(txnId1).get();
+        assertEquals(txnId1, meta1.id());
+        assertEquals(TxnStatus.OPEN, meta1.status());
+
+        meta3 = buffer.getTransactionMeta(txnId3).get();
+        assertEquals(txnId3, meta3.id());
+        assertEquals(TxnStatus.COMMITTED, meta3.status());
+
+        // purge a non exist ledger.
+        try {
+            buffer.purgeTxns(Lists.newArrayList(Long.valueOf(1L))).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof NoTxnsCommittedAtLedgerException);
+        }
+
+        verifyTxnNotExist(txnId2);
+
+        meta1 = buffer.getTransactionMeta(txnId1).get();
+        assertEquals(txnId1, meta1.id());
+        assertEquals(TxnStatus.OPEN, meta1.status());
+
+        meta3 = buffer.getTransactionMeta(txnId3).get();
+        assertEquals(txnId3, meta3.id());
+        assertEquals(TxnStatus.COMMITTED, meta3.status());
+    }
+
+    @Test
+    public void testAppendEntry() throws ExecutionException, InterruptedException, ManagedLedgerException,
+                                         BrokerServiceException.NamingException {
+        ManagedLedger ledger = factory.open("test_ledger");
+        PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+                                                                                brokerService);
+        final int numEntries = 10;
+        TxnID txnID = new TxnID(1111L, 2222L);
+        List<ByteBuf> appendEntries =  appendEntries(newBuffer, txnID, numEntries, 0L);
+        List<ByteBuf> copy = new ArrayList<>(appendEntries);
+        TransactionMetaImpl meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+        assertEquals(meta.id(), txnID);
+        assertEquals(numEntries, meta.numEntries());
+        assertEquals(meta.status(), TxnStatus.OPEN);
+
+        verifyEntries(ledger, appendEntries, meta.getEntries());
+
+        newBuffer.commitTxn(txnID, 22L, 33L).get();
+        meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+
+        assertEquals(meta.id(), txnID);
+        assertEquals(meta.numEntries(), numEntries);
+        assertEquals(meta.status(), TxnStatus.COMMITTED);
+        verifyEntries(ledger, copy, meta.getEntries());
+    }
+
+    @Test
+    public void testCommitMarker() throws Exception {
+        ManagedLedger ledger = factory.open("test_commit_ledger");
+        PersistentTransactionBuffer commitBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+                                                                                   brokerService);
+        final int numEntries = 10;
+        List<ByteBuf> appendEntires = appendEntries(commitBuffer, txnID, numEntries, 0L);
+
+        TransactionMetaImpl meta = (TransactionMetaImpl) commitBuffer.getTransactionMeta(txnID).get();
+        assertEquals(meta.id(), txnID);
+        assertEquals(meta.numEntries(), numEntries);
+        assertEquals(meta.status(), TxnStatus.OPEN);
+
+        verifyEntries(ledger, appendEntires, meta.getEntries());
+
+        commitBuffer.commitTxn(txnID, 22L, 33L).get();
+        assertEquals(meta.id(), txnID);
+        assertEquals(meta.numEntries(), numEntries);
+        assertEquals(meta.status(), TxnStatus.COMMITTED);
+
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        Entry entry = getEntry(cursor, ledger.getLastConfirmedEntry());
+
+        boolean commitMarker = Markers.isTxnCommitMarker(Commands.parseMessageMetadata(entry.getDataBuffer()));
+        assertTrue(commitMarker);
+
+    }
+
+    @Test
+    public void testAbortMarker() throws Exception {
+        ManagedLedger ledger = factory.open("test_abort_ledger");
+        PersistentTransactionBuffer abortBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+                                                                                   brokerService);
+        final int numEntries = 10;
+        List<ByteBuf> appendEntires = appendEntries(abortBuffer, txnID, numEntries, 0L);
+
+        TransactionMetaImpl meta = (TransactionMetaImpl) abortBuffer.getTransactionMeta(txnID).get();
+        assertEquals(meta.id(), txnID);
+        assertEquals(meta.numEntries(), numEntries);
+        assertEquals(meta.status(), TxnStatus.OPEN);
+
+        verifyEntries(ledger, appendEntires, meta.getEntries());
+
+        abortBuffer.abortTxn(txnID).get();
+        assertEquals(meta.id(), txnID);
+        assertEquals(meta.numEntries(), numEntries);
+        assertEquals(meta.status(), TxnStatus.ABORTED);
+
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        Entry entry = getEntry(cursor, ledger.getLastConfirmedEntry());
+
+        boolean abortMarker = Markers.isTxnAbortMarker(Commands.parseMessageMetadata(entry.getDataBuffer()));
+        assertTrue(abortMarker);
+    }
+
+    private void verifyEntries(ManagedLedger ledger, List<ByteBuf> appendEntries,
+                               SortedMap<Long, Position> addedEntries)
+        throws ManagedLedgerException, InterruptedException {
+        ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest);
+        assertNotNull(cursor);
+        for (Map.Entry<Long, Position> longPositionEntry : addedEntries.entrySet()) {
+            Entry entry = getEntry(cursor, longPositionEntry.getValue());
+            assertTrue(appendEntries.remove(entry.getDataBuffer()));
+        }
+    }
+
+    private Entry getEntry(ManagedCursor cursor, Position position)
+        throws ManagedLedgerException, InterruptedException {
+        assertNotNull(cursor);
+        cursor.seek(position);
+        List<Entry> readEntry = cursor.readEntries(1);
+        assertEquals(readEntry.size(), 1);
+        return readEntry.get(0);
+    }
+
+    @Test
+    public void testNoDeduplicateMessage()
+        throws ManagedLedgerException, InterruptedException, BrokerServiceException.NamingException,
+               ExecutionException {
+        ManagedLedger ledger = factory.open("test_deduplicate");
+        PersistentTransactionBuffer newBuffer = new PersistentTransactionBuffer(successTopicName, ledger,
+                                                                                brokerService);
+        final int numEntries = 10;
+
+        TxnID txnID = new TxnID(1234L, 5678L);
+        List<ByteBuf> appendEntries = appendEntries(newBuffer, txnID, numEntries, 0L);
+        TransactionMetaImpl meta = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+
+        assertEquals(meta.id(), txnID);
+        assertEquals(meta.status(), TxnStatus.OPEN);
+        assertEquals(meta.numEntries(), appendEntries.size());
+
+        verifyEntries(ledger, appendEntries, meta.getEntries());
+
+        // append new message with same sequenceId
+        List<ByteBuf> deduplicateData = new ArrayList<>();
+        for (int i = 0; i < numEntries; i++) {
+            long sequenceId = i;
+            ByteBuf data = Unpooled.copiedBuffer("message-deduplicate-" + sequenceId, UTF_8);
+            newBuffer.appendBufferToTxn(txnID, sequenceId, data);
+            deduplicateData.add(data);
+        }
+
+        TransactionMetaImpl meta1 = (TransactionMetaImpl) newBuffer.getTransactionMeta(txnID).get();
+
+        assertEquals(meta1.id(), txnID);
+        assertEquals(meta1.numEntries(), numEntries);
+        assertEquals(meta.status(), TxnStatus.OPEN);
+
+        // read all entries in new buffer
+        ManagedCursor read = ledger.newNonDurableCursor(PositionImpl.earliest);
+        List<Entry> allEntries = read.readEntries(100);
+        List<ByteBuf> allMsg = allEntries.stream().map(entry -> entry.getDataBuffer()).collect(Collectors.toList());
+
+        assertEquals(allEntries.size(), numEntries);
+        verifyEntries(ledger, allMsg, meta1.getEntries());
+    }
+
+    private void verifyTxnNotExist(TxnID txnID) throws Exception {
+        try {
+            buffer.getTransactionMeta(txnID).get();
+        } catch (ExecutionException ee) {
+            assertTrue(ee.getCause() instanceof TransactionNotFoundException);
+        }
+    }
+
+    private List<ByteBuf> appendEntries(PersistentTransactionBuffer writeBuffer, TxnID id, int numEntries,
+                                        long startSequenceId) {
+        List<ByteBuf> entries = new ArrayList<>();
+        for (int i = 0; i < numEntries; i++) {
+            long sequenceId = startSequenceId + i;
+            writeBuffer.appendBufferToTxn(id, sequenceId, Unpooled.copiedBuffer("message-" + sequenceId, UTF_8)).join();
+            entries.add(Unpooled.copiedBuffer("message-" + sequenceId, UTF_8));
+        }
+        return entries;
+    }
+
+    private void verifyAndReleaseEntries(List<TransactionEntry> txnEntries,
+                                         TxnID txnID,
+                                         long startSequenceId,
+                                         int numEntriesToRead) {
+        assertEquals(txnEntries.size(), numEntriesToRead);
+        for (int i = 0; i < numEntriesToRead; i++) {
+            try (TransactionEntry txnEntry = txnEntries.get(i)) {
+                assertEquals(txnEntry.committedAtLedgerId(), 22L);
+                assertEquals(txnEntry.committedAtEntryId(), 33L);
+                assertEquals(txnEntry.txnId(), txnID);
+                assertEquals(txnEntry.sequenceId(), startSequenceId + i);
+                assertEquals(new String(
+                    ByteBufUtil.getBytes(txnEntry.getEntryBuffer()),
+                    UTF_8
+                ), "message-" + i);
+            }
+        }
+    }
+
+}
diff --git a/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java
index b5bda24..4902e2f 100644
--- a/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java
+++ b/pulsar-transaction/buffer/src/test/java/org/apache/pulsar/transaction/buffer/impl/TransactionBufferTest.java
@@ -74,7 +74,7 @@ public class TransactionBufferTest {
 
     @AfterMethod
     public void teardown() throws Exception {
-        this.buffer.close();
+        this.buffer.closeAsync();
     }
 
     @Test