You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/17 19:40:38 UTC

[pulsar] branch master updated: [transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path (#4265)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e66ba27  [transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path (#4265)
e66ba27 is described below

commit e66ba275df7a608a7cd61c91248dd543b1e1edbb
Author: Marvin Cai <ca...@gmail.com>
AuthorDate: Mon Jun 17 12:40:33 2019 -0700

    [transaction][acknowledge] Introduce in-memory PENDING_ACK state in acknowledgement path (#4265)
    
    Master Issue: #2664
    
    Motivation:
    Add acknowledgeMessage, commit, abort for transaction in PersistentSubscription.
    
    Changes:
    Will put message in Pending_ACK status when acknowledgeMessage is called with TxnID.
    No real status class introduced, only added collection to hold messages in Pending_ACK status.
    Current PR only keep Pending_ACK state in memory, in subsequent PR will also persistent these pending acks so we can recover from broker failure.
    
    Add commitTxn to put message to Deleted status.
    Add abortTxn to put message to Pending status.
    
    For normal acknowledgeMessage and redeliverUnacknowledgedMessages, will check to see if
    message if message is in Pending_ACK first. If true, will **ignore** those acks/redeliverys.
    
    Add unit test.
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   6 +
 pulsar-broker/pom.xml                              |   6 +
 .../service/persistent/PersistentSubscription.java | 338 ++++++++++++++++++++-
 .../broker/service/PersistentTopicE2ETest.java     |  27 +-
 .../persistent/PersistentSubscriptionTest.java     | 284 +++++++++++++++++
 pulsar-transaction/common/pom.xml                  |   2 +-
 .../exception/TransactionConflictException.java}   |  32 +-
 .../exception/package-info.java}                   |  30 +-
 .../pulsar/transaction/impl/common/TxnID.java      |   4 +
 pulsar-transaction/pom.xml                         |   2 +-
 10 files changed, 662 insertions(+), 69 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 124f82a..a936954 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2518,6 +2518,12 @@ public class ManagedCursorImpl implements ManagedCursor {
         return individualDeletedMessages;
     }
 
+    public boolean isMessageDeleted(Position position) {
+        checkArgument(position instanceof PositionImpl);
+        return individualDeletedMessages.contains(((PositionImpl) position).getLedgerId(),
+                ((PositionImpl) position).getEntryId()) || ((PositionImpl) position).compareTo(markDeletePosition) <= 0 ;
+    }
+
     /**
      * Checks given position is part of deleted-range and returns next position of upper-end as all the messages are
      * deleted up to that point.
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 85b88ac..c4f9ae1 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -110,6 +110,12 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-transaction-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
       <artifactId>managed-ledger-original</artifactId>
       <version>${project.version}</version>
       <type>test-jar</type>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 70be6ba..af2dde9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -20,12 +20,20 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.google.common.base.MoreObjects;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -39,7 +47,9 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -56,10 +66,16 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 public class PersistentSubscription implements Subscription {
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
@@ -77,6 +93,29 @@ public class PersistentSubscription implements Subscription {
     // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
+    // Map to keep track of message ack by each txn.
+    private ConcurrentOpenHashMap<TxnID, ConcurrentOpenHashSet<Position>> pendingAckMessagesMap;
+
+    // Messages acked by ongoing transaction, pending transaction commit to materialize the acks. For faster look up.
+    // Using hashset as a message should only be acked once by one transaction.
+    private ConcurrentOpenHashSet<Position> pendingAckMessages;
+
+    // Message cumulative acked by ongoing transaction, pending transaction commit to materialize the ack.
+    // Only one transaction can cumulative ack.
+    // This parameter only keep the the largest Position it cumulative ack,as any Position smaller will also be covered.
+    private volatile Position pendingCumulativeAckMessage;
+
+    private static final AtomicReferenceFieldUpdater<PersistentSubscription, Position> POSITION_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(PersistentSubscription.class, Position.class,
+                    "pendingCumulativeAckMessage");
+
+    // ID of transaction currently using cumulative ack.
+    private volatile TxnID pendingCumulativeAckTxnId;
+
+    private static final AtomicReferenceFieldUpdater<PersistentSubscription, TxnID> PENDING_CUMULATIVE_ACK_TXNID_UPDATER =
+            AtomicReferenceFieldUpdater.newUpdater(PersistentSubscription.class, TxnID.class,
+                    "pendingCumulativeAckTxnId");
+
     private static final String REPLICATED_SUBSCRIPTION_PROPERTY = "pulsar.replicated.subscription";
 
     // Map of properties that is used to mark this subscription as "replicated".
@@ -225,8 +264,15 @@ public class PersistentSubscription implements Subscription {
         Position previousMarkDeletePosition = cursor.getMarkDeletedPosition();
 
         if (ackType == AckType.Cumulative) {
+            if (this.pendingCumulativeAckTxnId != null) {
+                log.warn("[{}][{}] An ongoing transaction:{} is doing cumulative ack, " +
+                         "new cumulative ack is not allowed till the transaction is committed.",
+                          topicName, subName, this.pendingCumulativeAckTxnId.toString());
+                return;
+            }
+
             if (positions.size() != 1) {
-                log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids", topicName, subName);
+                log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids.", topicName, subName);
                 return;
             }
 
@@ -239,7 +285,35 @@ public class PersistentSubscription implements Subscription {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
             }
-            cursor.asyncDelete(positions, deleteCallback, positions);
+            // Check if message is acknowledged by ongoing transaction.
+            if ((pendingAckMessages != null && pendingAckMessages.size() != 0) || pendingCumulativeAckMessage != null) {
+                List<Position> positionsSafeToAck;
+                synchronized (PersistentSubscription.this) {
+                    positionsSafeToAck = positions.stream().filter(position -> {
+                        checkArgument(position instanceof PositionImpl);
+                        // If single ack try to ack message in pending_ack status, skip this ack.
+                        if (pendingAckMessages != null && this.pendingAckMessages.contains(position)) {
+                            log.warn("[{}][{}] Invalid acks position conflict with an ongoing transaction:{}.",
+                                    topicName, subName, this.pendingCumulativeAckTxnId.toString());
+                            return false;
+                        }
+
+                        // If single ack is within range of cumulative ack of an ongoing transaction, skip this ack.
+                        if (null != this.pendingCumulativeAckMessage &&
+                                ((PositionImpl) position).compareTo((PositionImpl) this.pendingCumulativeAckMessage) <= 0) {
+                            log.warn("[{}][{}] Invalid acks position within cumulative ack position of an ongoing " +
+                                    "transaction:{}.", topicName, subName, this.pendingCumulativeAckTxnId.toString());
+                            return false;
+                        }
+
+                        return true;
+                    }).collect(Collectors.toList());
+                }
+                cursor.asyncDelete(positionsSafeToAck, deleteCallback, positionsSafeToAck);
+            } else {
+                cursor.asyncDelete(positions, deleteCallback, positions);
+            }
+
             dispatcher.getRedeliveryTracker().removeBatch(positions);
         }
 
@@ -262,6 +336,110 @@ public class PersistentSubscription implements Subscription {
         }
     }
 
+    /**
+     * Acknowledge message(s) for an ongoing transaction.
+     * <p>
+     * It can be of {@link AckType#Individual} or {@link AckType#Cumulative}. Single messages acked by ongoing
+     * transaction will be put in pending_ack state and only marked as deleted after transaction is committed.
+     * <p>
+     * Only one transaction is allowed to do cumulative ack on a subscription at a given time.
+     * If a transaction do multiple cumulative ack, only the one with largest position determined by
+     * {@link PositionImpl#compareTo(PositionImpl)} will be kept as it cover all position smaller than it.
+     * <p>
+     * If an ongoing transaction cumulative acked a message and then try to ack single message which is
+     * smaller than that one it cumulative acked, it'll succeed.
+     * <p>
+     * If transaction is aborted all messages acked by it will be put back to pending state.
+     *
+     * @param txnId                  TransactionID of an ongoing transaction trying to sck message.
+     * @param positions              {@link Position}(s) it try to ack.
+     * @param ackType                {@link AckType}.
+     * @throws TransactionConflictException if try to do cumulative ack when another ongoing transaction already doing
+     *  cumulative ack or try to single ack message already acked by any ongoing transaction.
+     * @throws IllegalArgumentException if try to cumulative ack but passed in multiple positions.
+     */
+    public synchronized void acknowledgeMessage(TxnID txnId, List<Position> positions, AckType ackType) throws TransactionConflictException {
+        checkArgument(txnId != null, "TransactionID can not be null.");
+        if (AckType.Cumulative == ackType) {
+            // Check if another transaction is already using cumulative ack on this subscription.
+            if (this.pendingCumulativeAckTxnId != null && this.pendingCumulativeAckTxnId != txnId) {
+                String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
+                                  " try to cumulative ack message while transaction:" + this.pendingCumulativeAckTxnId +
+                                  " already cumulative acked messages.";
+                log.error(errorMsg);
+                throw new TransactionConflictException(errorMsg);
+            }
+
+            if (positions.size() != 1) {
+                String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
+                                  " invalid cumulative ack received with multiple message ids.";
+                log.error(errorMsg);
+                throw new IllegalArgumentException(errorMsg);
+            }
+
+            Position position = positions.get(0);
+            checkArgument(position instanceof PositionImpl);
+
+            if (((PositionImpl) position).compareTo((PositionImpl) cursor.getMarkDeletedPosition()) <= 0) {
+                String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
+                        " try to cumulative ack position: " + position + " within range of cursor's " +
+                        "markDeletePosition: " + cursor.getMarkDeletedPosition();
+                log.error(errorMsg);
+                throw new TransactionConflictException(errorMsg);
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] TxnID:[{}] Cumulative ack on {}.", topicName, subName, txnId.toString(), position);
+            }
+
+             if (this.pendingCumulativeAckTxnId == null) {
+                // Only set pendingCumulativeAckTxnId if no transaction is doing cumulative ack.
+                PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, txnId);
+                POSITION_UPDATER.set(this, position);
+            } else if (((PositionImpl)position).compareTo((PositionImpl)this.pendingCumulativeAckMessage) > 0) {
+                // If new cumulative ack position is greater than current one, update it.
+                POSITION_UPDATER.set(this, position);
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] TxnID:[{}] Individual acks on {}", topicName, subName, txnId.toString(), positions);
+            }
+
+            if (pendingAckMessagesMap == null) {
+                pendingAckMessagesMap = new ConcurrentOpenHashMap<>();
+            }
+
+            if (pendingAckMessages == null) {
+                pendingAckMessages = new ConcurrentOpenHashSet<>();
+            }
+
+            ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn =
+                    pendingAckMessagesMap.computeIfAbsent(txnId, txn -> new ConcurrentOpenHashSet<>());
+
+            for (Position position : positions) {
+                // If try to ack message already acked by some ongoign transaction(can be itself), throw exception.
+                // Acking single message within range of cumulative ack(if exist) is considered valid operation.
+                if (this.pendingAckMessages.contains(position)) {
+                    String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
+                                      " try to ack message:" + position + " in pending ack status.";
+                    log.error(errorMsg);
+                    throw new TransactionConflictException(errorMsg);
+                }
+
+                // If try to ack message already acked by committed transaction or normal acknowledge, throw exception.
+                if (((ManagedCursorImpl) cursor).isMessageDeleted(position)) {
+                    String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
+                            " try to ack message:" + position + " already acked before.";
+                    log.error(errorMsg);
+                    throw new TransactionConflictException(errorMsg);
+                }
+
+                pendingAckMessageForCurrentTxn.add(position);
+                this.pendingAckMessages.add(position);
+            }
+        }
+    }
+
     private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
         @Override
         public void markDeleteComplete(Object ctx) {
@@ -706,12 +884,51 @@ public class PersistentSubscription implements Subscription {
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
-        dispatcher.redeliverUnacknowledgedMessages(consumer);
+        ConcurrentLongLongPairHashMap positionMap = consumer.getPendingAcks();
+        // Only check if message is in pending_ack status when there's ongoing transaction.
+        if (null != positionMap && ((pendingAckMessages != null && pendingAckMessages.size() != 0)
+                                                                            || pendingCumulativeAckMessage != null)) {
+            List<PositionImpl> pendingPositions = new ArrayList<>();
+            PositionImpl cumulativeAckPosition = (null == this.pendingCumulativeAckMessage) ? null :
+                    (PositionImpl) this.pendingCumulativeAckMessage;
+
+            positionMap.asMap().entrySet().forEach(entry -> {
+                PositionImpl position = new PositionImpl(entry.getKey().first, entry.getKey().second);
+                if ((pendingAckMessages == null || (pendingAckMessages != null &&
+                        !this.pendingAckMessages.contains(position))) &&
+                        (null == cumulativeAckPosition ||
+                                (null != cumulativeAckPosition && position.compareTo(cumulativeAckPosition) > 0))) {
+                    pendingPositions.add(position);
+                }
+            });
+
+            dispatcher.redeliverUnacknowledgedMessages(consumer, pendingPositions);
+        } else {
+            dispatcher.redeliverUnacknowledgedMessages(consumer);
+        }
     }
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
-        dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
+        // If there's ongoing transaction.
+        if ((pendingAckMessages != null && pendingAckMessages.size() != 0) || pendingCumulativeAckMessage != null) {
+            // Check if message is in pending_ack status.
+            List<PositionImpl> pendingPositions = new ArrayList<>();
+            PositionImpl cumulativeAckPosition = (null == this.pendingCumulativeAckMessage) ? null :
+                    (PositionImpl) this.pendingCumulativeAckMessage;
+
+            positions.forEach(position -> {
+                if ((pendingAckMessages == null || (pendingAckMessages != null &&
+                        !this.pendingAckMessages.contains(position))) &&
+                        (null == cumulativeAckPosition ||
+                                (null != cumulativeAckPosition && position.compareTo(cumulativeAckPosition) > 0))) {
+                    pendingPositions.add(position);
+                }
+            });
+            dispatcher.redeliverUnacknowledgedMessages(consumer, pendingPositions);
+        } else {
+            dispatcher.redeliverUnacknowledgedMessages(consumer, positions);
+        }
     }
 
     @Override
@@ -744,6 +961,119 @@ public class PersistentSubscription implements Subscription {
     }
 
     /**
+     * Commit a transaction.
+     *
+     * @param txnId         {@link TxnID} to identify the transaction.
+     * @param properties    Additional user-defined properties that can be associated with a particular cursor position.
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in this subscription.
+     */
+    public synchronized CompletableFuture<Void> commitTxn(TxnID txnId, Map<String,Long> properties) {
+
+        if (pendingAckMessagesMap != null && !this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] Transaction with id:" + txnId + " not found.";
+            log.error(errorMsg);
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> commitFuture = new CompletableFuture<>();
+        CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
+        CompletableFuture<Void> marketDeleteFuture = new CompletableFuture<>();
+
+        MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
+            @Override
+            public void markDeleteComplete(Object ctx) {
+                PositionImpl pos = (PositionImpl) ctx;
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Mark deleted messages until position {}", topicName, subName, pos);
+                }
+                marketDeleteFuture.complete(null);
+            }
+
+            @Override
+            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Failed to mark delete for position ", topicName, subName, ctx, exception);
+                }
+                marketDeleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        DeleteCallback deleteCallback = new DeleteCallback() {
+            @Override
+            public void deleteComplete(Object position) {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
+                }
+                deleteFuture.complete(null);
+            }
+
+            @Override
+            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                if (log.isDebugEnabled()) {
+                    log.warn("[{}][{}] Failed to delete message at {}", topicName, subName, ctx, exception);
+                }
+                deleteFuture.completeExceptionally(exception);
+            }
+        };
+
+        // It's valid to create transaction then commit without doing any operation, which will cause
+        // pendingAckMessagesMap to be null.
+        List<Position> positions = pendingAckMessagesMap != null ? this.pendingAckMessagesMap.remove(txnId).values() :
+                                                                                             Collections.emptyList();
+        // Materialize all single acks.
+        cursor.asyncDelete(positions, deleteCallback, positions);
+        if (pendingAckMessages != null) {
+            positions.forEach(position -> this.pendingAckMessages.remove(position));
+        }
+
+        // Materialize cumulative ack.
+        cursor.asyncMarkDelete(this.pendingCumulativeAckMessage, (null == properties)?
+                Collections.emptyMap() : properties, markDeleteCallback, this.pendingCumulativeAckMessage);
+
+        // Reset txdID and position for cumulative ack.
+        PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
+        POSITION_UPDATER.set(this, null);
+        deleteFuture.runAfterBoth(marketDeleteFuture, () -> commitFuture.complete(null))
+                    .exceptionally((exception) -> {
+                        commitFuture.completeExceptionally(exception);
+                        return null;
+                    });
+
+        return commitFuture;
+    }
+
+    /**
+     * Abort a transaction.
+     *
+     * @param txnId  {@link TxnID} to identify the transaction.
+     * @param consumer {@link Consumer} which aborting transaction.
+     *
+     * @throws IllegalArgumentException if given {@link TxnID} is not found in this subscription.
+     */
+
+    public synchronized CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer) {
+        if (pendingAckMessagesMap != null && !this.pendingAckMessagesMap.containsKey(txnId)) {
+            String errorMsg = "[" + topicName + "][" + subName + "] Transaction with id:" + txnId + " not found.";
+            throw new IllegalArgumentException(errorMsg);
+        }
+
+        CompletableFuture<Void> abortFuture = new CompletableFuture<>();
+        ConcurrentOpenHashSet<Position> pendingAckMessageForCurrentTxn = pendingAckMessagesMap != null ?
+                this.pendingAckMessagesMap.remove(txnId) : new ConcurrentOpenHashSet();
+        if (pendingAckMessages != null) {
+            pendingAckMessageForCurrentTxn.forEach(position -> this.pendingAckMessages.remove(position));
+        }
+        // Reset txdID and position for cumulative ack.
+        PENDING_CUMULATIVE_ACK_TXNID_UPDATER.set(this, null);
+        POSITION_UPDATER.set(this, null);
+        dispatcher.redeliverUnacknowledgedMessages(consumer, (List<PositionImpl>)
+                                                                    (List<?>)pendingAckMessageForCurrentTxn.values());
+        abortFuture.complete(null);
+
+        return abortFuture;
+    }
+
+    /**
      * Return a merged map that contains the cursor properties specified by used
      * (eg. when using compaction subscription) and the subscription properties.
      */
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
index 790505d..e06b249 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 
 import java.lang.reflect.Field;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -1307,7 +1308,8 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
         final String subName = "sub2";
 
         Message<String> msg;
-        int totalMessages = 10;
+        List<Message<String>> unackedMessages = new ArrayList<>();
+        int totalMessages = 20;
 
         Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                 .topic(topicName)
@@ -1326,12 +1328,11 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
             producer.send("my-message-" + i);
         }
 
-        // (2) Consume and ack messages except first message
-        Message<String> unAckedMsg = null;
+        // (2) Consume and only ack last 10 messages
         for (int i = 0; i < totalMessages; i++) {
             msg = consumer.receive();
-            if (i == 0) {
-                unAckedMsg = msg;
+            if (i >= 10) {
+                unackedMessages.add(msg);
             } else {
                 consumer.acknowledge(msg);
             }
@@ -1339,13 +1340,17 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
 
         consumer.redeliverUnacknowledgedMessages();
 
-        // Verify: msg [L:0] must be redelivered
-        try {
-            msg = consumer.receive(1, TimeUnit.SECONDS);
-            assertEquals(msg.getValue(), unAckedMsg.getValue());
-        } catch (Exception e) {
-            fail("msg should be redelivered ", e);
+        for (int i = 0; i < 10; i++) {
+            // Verify: msg [L:0] must be redelivered
+            try {
+                final Message<String> redeliveredMsg = consumer.receive(1, TimeUnit.SECONDS);
+                unackedMessages.removeIf(unackedMessage -> unackedMessage.getValue().equals(redeliveredMsg.getValue()));
+            } catch (Exception e) {
+                fail("msg should be redelivered ", e);
+            }
         }
+        // Make sure that first 10 messages that we didn't acknowledge get redelivered.
+        assertTrue(unackedMessages.size() == 0);
 
         // Verify no other messages are redelivered
         msg = consumer.receive(100, TimeUnit.MILLISECONDS);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
new file mode 100644
index 0000000..9e65326
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.PersistentTopicTest;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
+import org.apache.pulsar.transaction.impl.common.TxnID;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.ZooKeeper;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockBookKeeper;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.createMockZooKeeper;
+import static org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+@PrepareForTest({ ZooKeeperDataCache.class, BrokerService.class })
+public class PersistentSubscriptionTest {
+
+    private PulsarService pulsarMock;
+    private BrokerService brokerMock;
+    private ManagedLedgerFactory mlFactoryMock;
+    private ManagedLedger ledgerMock;
+    private ManagedCursorImpl cursorMock;
+    private ConfigurationCacheService configCacheServiceMock;
+    private PersistentTopic topic;
+    private PersistentSubscription persistentSubscription;
+    private Consumer consumerMock;
+
+    final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
+    final String subName = "subscriptionName";
+
+    final TxnID txnID1 = new TxnID(1,1);
+    final TxnID txnID2 = new TxnID(1,2);
+
+    private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        ServiceConfiguration svcConfig = spy(new ServiceConfiguration());
+        pulsarMock = spy(new PulsarService(svcConfig));
+        doReturn(svcConfig).when(pulsarMock).getConfiguration();
+        doReturn(mock(Compactor.class)).when(pulsarMock).getCompactor();
+
+        mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsarMock).getManagedLedgerFactory();
+
+        ZooKeeper zkMock = createMockZooKeeper();
+        doReturn(zkMock).when(pulsarMock).getZkClient();
+        doReturn(createMockBookKeeper(zkMock, pulsarMock.getOrderedExecutor().chooseThread(0)))
+                .when(pulsarMock).getBookKeeperClient();
+
+        ZooKeeperCache cache = mock(ZooKeeperCache.class);
+        doReturn(30).when(cache).getZkOperationTimeoutSeconds();
+        CompletableFuture getDataFuture = new CompletableFuture();
+        getDataFuture.complete(Optional.empty());
+        doReturn(getDataFuture).when(cache).getDataAsync(anyString(), any(), any());
+        doReturn(cache).when(pulsarMock).getLocalZkCache();
+
+        configCacheServiceMock = mock(ConfigurationCacheService.class);
+        @SuppressWarnings("unchecked")
+        ZooKeeperDataCache<Policies> zkPoliciesDataCacheMock = mock(ZooKeeperDataCache.class);
+        doReturn(zkPoliciesDataCacheMock).when(configCacheServiceMock).policiesCache();
+        doReturn(configCacheServiceMock).when(pulsarMock).getConfigurationCache();
+        doReturn(Optional.empty()).when(zkPoliciesDataCacheMock).get(anyString());
+
+        LocalZooKeeperCacheService zkCacheMock = mock(LocalZooKeeperCacheService.class);
+        doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkPoliciesDataCacheMock).getAsync(any());
+        doReturn(zkPoliciesDataCacheMock).when(zkCacheMock).policiesCache();
+        doReturn(zkCacheMock).when(pulsarMock).getLocalZkCacheService();
+
+        brokerMock = spy(new BrokerService(pulsarMock));
+        doNothing().when(brokerMock).unloadNamespaceBundlesGracefully();
+        doReturn(brokerMock).when(pulsarMock).getBrokerService();
+
+        ledgerMock = mock(ManagedLedger.class);
+        cursorMock = mock(ManagedCursorImpl.class);
+        doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
+        doReturn("mockCursor").when(cursorMock).getName();
+        doReturn(new PositionImpl(1, 50)).when(cursorMock).getMarkDeletedPosition();
+
+        topic = new PersistentTopic(successTopicName, ledgerMock, brokerMock);
+
+        consumerMock = mock(Consumer.class);
+
+        persistentSubscription = new PersistentSubscription(topic, subName, cursorMock, false);
+    }
+
+    @AfterMethod
+    public void teardown() throws Exception {
+        brokerMock.close(); //to clear pulsarStats
+        try {
+            pulsarMock.close();
+        } catch (Exception e) {
+            log.warn("Failed to close pulsar service", e);
+            throw e;
+        }
+    }
+
+    @Test
+    public void testCanAcknowledgeAndCommitForTransaction() throws TransactionConflictException {
+        List<Position> expectedSinglePositions = new ArrayList<>();
+        expectedSinglePositions.add(new PositionImpl(1, 1));
+        expectedSinglePositions.add(new PositionImpl(1, 3));
+        expectedSinglePositions.add(new PositionImpl(1, 5));
+
+        doAnswer((invocationOnMock) -> {
+            assertTrue(((List)invocationOnMock.getArguments()[0]).containsAll(expectedSinglePositions));
+            ((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1])
+                    .deleteComplete(invocationOnMock.getArguments()[2]);
+            return null;
+        }).when(cursorMock).asyncDelete(anyList(), any(AsyncCallbacks.DeleteCallback.class), anyObject());
+
+        doAnswer((invocationOnMock) -> {
+            assertTrue(((PositionImpl)invocationOnMock.getArguments()[0]).compareTo(new PositionImpl(3, 100)) == 0);
+            ((AsyncCallbacks.MarkDeleteCallback) invocationOnMock.getArguments()[2])
+                    .markDeleteComplete(invocationOnMock.getArguments()[3]);
+            return null;
+        }).when(cursorMock).asyncMarkDelete(anyObject(), anyObject(), any(AsyncCallbacks.MarkDeleteCallback.class), anyObject());
+
+        List<Position> positions = new ArrayList<>();
+        positions.add(new PositionImpl(1, 1));
+        positions.add(new PositionImpl(1, 3));
+        positions.add(new PositionImpl(1, 5));
+
+        // Single ack for txn
+        persistentSubscription.acknowledgeMessage(txnID1, positions, AckType.Individual);
+
+        positions.clear();
+        positions.add(new PositionImpl(3, 100));
+
+        // Cumulative ack for txn
+        persistentSubscription.acknowledgeMessage(txnID1, positions, AckType.Cumulative);
+
+        // Commit txn
+        persistentSubscription.commitTxn(txnID1, Collections.emptyMap());
+
+        // Verify corresponding ledger method was called with expected args.
+        verify(cursorMock, times(1)).asyncDelete(anyList(), any(), any());
+        verify(cursorMock, times(1)).asyncMarkDelete(any(), anyMap(), anyObject(), any());
+    }
+
+    @Test
+    public void testCanAcknowledgeAndAbortForTransaction() throws TransactionConflictException, BrokerServiceException {
+        List<Position> positions = new ArrayList<>();
+        positions.add(new PositionImpl(2, 1));
+        positions.add(new PositionImpl(2, 3));
+        positions.add(new PositionImpl(2, 5));
+
+        Position[] expectedSinglePositions = {new PositionImpl(3, 1),
+                                        new PositionImpl(3, 3), new PositionImpl(3, 5)};
+
+        doAnswer((invocationOnMock) -> {
+            assertTrue(Arrays.deepEquals(((List)invocationOnMock.getArguments()[0]).toArray(), expectedSinglePositions));
+            ((AsyncCallbacks.DeleteCallback) invocationOnMock.getArguments()[1])
+                    .deleteComplete(invocationOnMock.getArguments()[2]);
+            return null;
+        }).when(cursorMock).asyncDelete(anyList(), any(AsyncCallbacks.DeleteCallback.class), anyObject());
+
+        doReturn(PulsarApi.CommandSubscribe.SubType.Exclusive).when(consumerMock).subType();
+
+        persistentSubscription.addConsumer(consumerMock);
+
+        // Single ack for txn1
+        persistentSubscription.acknowledgeMessage(txnID1, positions, AckType.Individual);
+
+        positions.clear();
+        positions.add(new PositionImpl(1, 100));
+
+        // Cumulative ack for txn1
+        persistentSubscription.acknowledgeMessage(txnID1, positions, AckType.Cumulative);
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 1));
+
+        // Can not single ack message already acked.
+        try {
+            persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Individual);
+            fail("Single acknowledge for transaction2 should fail. ");
+        } catch (TransactionConflictException e) {
+            assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName] " +
+                    "Transaction:(1,2) try to ack message:2:1 in pending ack status.");
+        }
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 50));
+
+        // Can not cumulative ack message for another txn.
+        try {
+            persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Cumulative);
+            fail("Cumulative acknowledge for transaction2 should fail. ");
+        } catch (TransactionConflictException e) {
+            System.out.println(e.getMessage());
+            assertEquals(e.getMessage(),"[persistent://prop/use/ns-abc/successTopic][subscriptionName] " +
+                "Transaction:(1,2) try to cumulative ack message while transaction:(1,1) already cumulative acked messages.");
+        }
+
+        positions.clear();
+        positions.add(new PositionImpl(1, 1));
+        positions.add(new PositionImpl(1, 3));
+        positions.add(new PositionImpl(1, 5));
+        positions.add(new PositionImpl(3, 1));
+        positions.add(new PositionImpl(3, 3));
+        positions.add(new PositionImpl(3, 5));
+
+        // Acknowledge from normal consumer will succeed ignoring message acked by ongoing transaction.
+        persistentSubscription.acknowledgeMessage(positions, AckType.Individual, Collections.emptyMap());
+
+        //Abort txn.
+        persistentSubscription.abortTxn(txnID1, consumerMock);
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 50));
+
+        // Retry above ack, will succeed. As abort has clear pending_ack for those messages.
+        persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Cumulative);
+
+        positions.clear();
+        positions.add(new PositionImpl(2, 1));
+
+        persistentSubscription.acknowledgeMessage(txnID2, positions, AckType.Individual);
+    }
+}
diff --git a/pulsar-transaction/common/pom.xml b/pulsar-transaction/common/pom.xml
index d09bf2b..5f0d5ef 100644
--- a/pulsar-transaction/common/pom.xml
+++ b/pulsar-transaction/common/pom.xml
@@ -69,4 +69,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/TransactionConflictException.java
similarity index 63%
copy from pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
copy to pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/TransactionConflictException.java
index 16ce4e4..be1350a 100644
--- a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/TransactionConflictException.java
@@ -16,33 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.impl.common;
-
-import com.google.common.annotations.Beta;
-import java.io.Serializable;
-import lombok.Data;
+package org.apache.pulsar.transaction.common.exception;
 
 /**
- * An identifier for representing a transaction.
+ * Exception thrown when a transaction try to acknowledge message when it shouldn't.
+ *
  */
-@Beta
-@Data
-public class TxnID implements Serializable {
+public class TransactionConflictException extends Exception {
 
     private static final long serialVersionUID = 0L;
 
-    /*
-     * The most significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long mostSigBits;
-
-    /*
-     * The least significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long leastSigBits;
-
-}
+    public TransactionConflictException(String message) {
+        super(message);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/package-info.java
similarity index 59%
copy from pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
copy to pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/package-info.java
index 16ce4e4..70fff2c 100644
--- a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/common/exception/package-info.java
@@ -16,33 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.transaction.impl.common;
-
-import com.google.common.annotations.Beta;
-import java.io.Serializable;
-import lombok.Data;
-
 /**
- * An identifier for representing a transaction.
+ * Common exception used by pulsar transaction related modules.
  */
-@Beta
-@Data
-public class TxnID implements Serializable {
-
-    private static final long serialVersionUID = 0L;
-
-    /*
-     * The most significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long mostSigBits;
-
-    /*
-     * The least significant 64 bits of this TxnID.
-     *
-     * @serial
-     */
-    private final long leastSigBits;
-
-}
+package org.apache.pulsar.transaction.common.exception;
\ No newline at end of file
diff --git a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
index 16ce4e4..6321ac6 100644
--- a/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
+++ b/pulsar-transaction/common/src/main/java/org/apache/pulsar/transaction/impl/common/TxnID.java
@@ -45,4 +45,8 @@ public class TxnID implements Serializable {
      */
     private final long leastSigBits;
 
+    @Override
+    public String toString() {
+        return "(" + mostSigBits + "," + leastSigBits + ")";
+    }
 }
diff --git a/pulsar-transaction/pom.xml b/pulsar-transaction/pom.xml
index 9d28137..4189805 100644
--- a/pulsar-transaction/pom.xml
+++ b/pulsar-transaction/pom.xml
@@ -60,4 +60,4 @@
     </plugins>
   </build>
 
-</project>
+</project>
\ No newline at end of file