You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/04 07:16:14 UTC

[GitHub] sijie closed pull request #2400: PIP-22: Dead Letter Topic

sijie closed pull request #2400: PIP-22: Dead Letter Topic
URL: https://github.com/apache/incubator-pulsar/pull/2400
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
index 584a376fac..1d171f217e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
@@ -23,6 +23,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -30,6 +32,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 
 /**
  * A ManangedCursor is a persisted cursor inside a ManagedLedger.
@@ -75,6 +78,16 @@
      */
     Map<String, Long> getProperties();
 
+    /**
+     * Return entry at the position.
+     */
+    Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException;
+
+    /**
+     * Return entry at the position async.
+     */
+    void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx);
+
     /**
      * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller.
      *
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index bab354762a..7cc8aff172 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -44,7 +44,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -385,6 +387,27 @@ public void operationFailed(ManagedLedgerException exception) {
         });
     }
 
+    @Override
+    public Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException {
+        final CompletableFuture<Entry> readFuture = new CompletableFuture<>();
+        ledger.asyncReadEntry(position, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                readFuture.complete(entry);
+            }
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                readFuture.completeExceptionally(exception);
+            }
+        }, null);
+        return readFuture.get();
+    }
+
+    @Override
+    public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
+        ledger.asyncReadEntry(position, callback, ctx);
+    }
+
     @Override
     public List<Entry> readEntries(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException {
         checkArgument(numberOfEntriesToRead > 0);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
index ecf6acfc50..694ffc317c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java
@@ -27,6 +27,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -60,6 +62,17 @@ public MockManagedCursor(ManagedCursorContainer container, String name, Position
             return Collections.emptyMap();
         }
 
+        @Override
+        public Entry readEntry(PositionImpl position) throws InterruptedException, ExecutionException {
+            return null;
+        }
+
+
+        @Override
+        public void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ctx) {
+
+        }
+
         @Override
         public boolean isDurable() {
             return true;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f85c6175a7..e0c4f8c816 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -867,9 +867,8 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl
                 if (topic.getSubscriptions().containsKey(subscriptionName)) {
                     throw new RestException(Status.CONFLICT, "Subscription already exists for topic");
                 }
-
                 PersistentSubscription subscription = (PersistentSubscription) topic
-                        .createSubscription(subscriptionName, InitialPosition.Latest).get();
+                        .createSubscription(subscriptionName, InitialPosition.Latest, 0, null).get();
                 subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
                 log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
                         subscriptionName, messageId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
new file mode 100644
index 0000000000..efbd89400c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+
+public interface RedeliveryTracker {
+
+    int incrementAndGetRedeliveryCount(Position position);
+
+    int getRedeliveryCount(Position position);
+
+    void remove(Position position);
+
+    void removeBatch(List<Position> positions);
+
+    void clear();
+
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index aacd7bcbf6..97765a84fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -523,6 +523,9 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
         final boolean readCompacted = subscribe.getReadCompacted();
         final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
         final InitialPosition initialPosition = subscribe.getInitialPosition();
+        final int maxRedeliveryCount = subscribe.getMaxRedeliveryCount();
+        final String deadLetterTopic = subscribe.getDeadLetterTopic();
+        final int maxUnackedMessagesPerConsumer = subscribe.getMaxUnackedMessagePerConsumer();
         final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
@@ -591,7 +594,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                                             if (isCompatible) {
                                                 return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                                     subType, priorityLevel, consumerName, isDurable,
-                                                    startMessageId, metadata, readCompacted, initialPosition);
+                                                    startMessageId, metadata, readCompacted, initialPosition,
+                                                    0, null, maxUnackedMessagesPerConsumer);
                                             } else {
                                                 return FutureUtil.failedFuture(new BrokerServiceException(
                                                     "Trying to subscribe with incompatible schema"
@@ -601,7 +605,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
                                     } else {
                                         return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                             subType, priorityLevel, consumerName, isDurable,
-                                            startMessageId, metadata, readCompacted, initialPosition);
+                                            startMessageId, metadata, readCompacted, initialPosition,
+                                            maxRedeliveryCount, deadLetterTopic, maxUnackedMessagesPerConsumer);
                                     }
                                 })
                                 .thenAccept(consumer -> {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 7488204fca..334b942ae3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -81,9 +81,11 @@ default long getOriginalSequenceId() {
 
     CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
             int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition);
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+            int maxRedeliveryCount, String deadLetterTopic, int maxUnackedMessagesPerConsumer);
 
-    CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition);
+    CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
+           int maxRedeliveryCount, String deadLetterTopic);
 
     CompletableFuture<Void> unsubscribe(String subName);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java
new file mode 100644
index 0000000000..ac5030446d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentRedeliveryTracker.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.nonpersistent;
+
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class NonPersistentRedeliveryTracker implements RedeliveryTracker {
+
+    private ConcurrentHashMap<Position, AtomicInteger> trackerCache = new ConcurrentHashMap<>(16);
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position) {
+        trackerCache.putIfAbsent(position, new AtomicInteger(0));
+        return trackerCache.get(position).incrementAndGet();
+    }
+
+    @Override
+    public int getRedeliveryCount(Position position) {
+        trackerCache.putIfAbsent(position, new AtomicInteger(0));
+        return trackerCache.get(position).get();
+    }
+
+    @Override
+    public void remove(Position position) {
+        trackerCache.remove(position);
+    }
+
+    @Override
+    public void removeBatch(List<Position> positions) {
+        if (positions != null) {
+            positions.forEach(this::remove);
+        }
+    }
+
+    @Override
+    public void clear() {
+        trackerCache.clear();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 281d8438db..66fb9eb389 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -314,7 +314,8 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition, int maxRedeliveryCount,
+            String deadLetterTopic, int maxUnackedMessagesPerConsumer) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -388,7 +389,8 @@ public void removeProducer(Producer producer) {
     }
 
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
+              int maxRedeliveryCount, String deadLetterTopic) {
         return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
index 6fe74bfa93..47217d68b9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java
@@ -41,7 +41,7 @@
 
     public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopic,
                                  String subscriptionName, ManagedCursor cursor) {
-        super(topic, subscriptionName, cursor);
+        super(topic, subscriptionName, cursor, 0, null);
         checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
         this.compactedTopic = compactedTopic;
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 17c5db7a6a..b996e012e4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -28,29 +28,39 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
-import org.apache.bookkeeper.mledger.Entry;
+import io.netty.buffer.ByteBuf;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
-import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
+import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
-import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
-import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentRedeliveryTracker;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.Backoff;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
+import org.apache.pulsar.utils.Quorum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +69,7 @@
 
 /**
  */
-public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
+public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers implements Dispatcher, ReadEntriesCallback {
 
     private static final int MaxReadBatchSize = 100;
     private static final int MaxRoundRobinBatchSize = 20;
@@ -69,6 +79,7 @@
 
     private CompletableFuture<Void> closeFuture = null;
     private ConcurrentLongPairSet messagesToReplay;
+    private ConcurrentLongPairSet messagesToDeadLetter;
 
     private boolean havePendingRead = false;
     private boolean havePendingReplayRead = false;
@@ -88,20 +99,32 @@
     private final ServiceConfiguration serviceConfig;
     private DispatchRateLimiter dispatchRateLimiter;
 
+    protected volatile int maxRedeliveryCount;
+    protected volatile String deadLetterTopic;
+    protected RedeliveryTracker redeliveryTracker;
+    private volatile ProducerImpl<byte[]> deadLetterTopicProducer;
+
     enum ReadType {
         Normal, Replay
     }
 
-    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor) {
+    public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, int maxRedeliveryCount,
+                                                 String deadLetterTopic) {
         this.cursor = cursor;
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
         this.topic = topic;
         this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+        this.messagesToDeadLetter = new ConcurrentLongPairSet(512, 2);
         this.readBatchSize = MaxReadBatchSize;
         this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
                 .getMaxUnackedMessagesPerSubscription();
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
         this.dispatchRateLimiter = null;
+        this.maxRedeliveryCount = maxRedeliveryCount;
+        this.deadLetterTopic = deadLetterTopic;
+        if (maxRedeliveryCount > 0) {
+            redeliveryTracker = new NonPersistentRedeliveryTracker();
+        }
     }
 
     @Override
@@ -132,11 +155,44 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
             throw new ConsumerBusyException("Subscription reached max consumers limit");
         }
 
+        deadLetterTopicProducer = newDeadLetterProducer();
+
         consumerList.add(consumer);
         consumerList.sort((c1, c2) -> c1.getPriorityLevel() - c2.getPriorityLevel());
         consumerSet.add(consumer);
     }
 
+    private ProducerImpl<byte[]> newDeadLetterProducer() throws BrokerServiceException {
+        if (maxRedeliveryCount > 0 && deadLetterTopicProducer == null) {
+            try {
+                if (maxRedeliveryCount > 0 && StringUtils.isBlank(deadLetterTopic)) {
+                    deadLetterTopic = String.format("%s-%s-DLQ", topic.getName(), Codec.decode(cursor.getName()));
+                }
+                return (ProducerImpl<byte[]>) topic.getBrokerService().pulsar().getClient().newProducer(Schema.BYTES)
+                        .topic(deadLetterTopic)
+                        .blockIfQueueFull(false)
+                        .create();
+            } catch (Throwable e) {
+                throw new BrokerServiceException(e);
+            }
+        }
+        return null;
+    }
+
+    void reloadDeadLetterProducer() throws BrokerServiceException {
+        try {
+            if (deadLetterTopicProducer != null) {
+                deadLetterTopicProducer.closeAsync();
+            }
+            deadLetterTopicProducer = newDeadLetterProducer();
+            if (deadLetterTopicProducer == null) {
+                redeliveryTracker.clear();
+            }
+        } catch (Throwable e) {
+            throw new BrokerServiceException(e);
+        }
+    }
+
     private boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
@@ -530,7 +586,7 @@ private boolean isAtleastOneConsumerAvailable() {
             // abort read if no consumers are connected or if disconnect is initiated
             return false;
         }
-        for(Consumer consumer : consumerList) {
+        for (Consumer consumer : consumerList) {
             if (isConsumerAvailable(consumer)) {
                 return true;
             }
@@ -556,11 +612,98 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
-        positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
         }
-        readMoreEntries();
+        if (maxRedeliveryCount > 0 && redeliveryTracker != null) {
+            for (PositionImpl position : positions) {
+                if (redeliveryTracker.incrementAndGetRedeliveryCount(position) <= maxRedeliveryCount) {
+                    messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+                } else {
+                    messagesToDeadLetter.add(position.getLedgerId(), position.getEntryId());
+                }
+            }
+            if (messagesToDeadLetter.size() > 0) {
+
+                Quorum quorum = new Quorum(messagesToDeadLetter.size(), result -> {
+                    readMoreEntries();
+                });
+                messagesToDeadLetter.forEach((ledgerId, entryId) -> {
+                    PositionImpl position = PositionImpl.get(ledgerId, entryId);
+                    cursor.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+                        @Override
+                        public void readEntryComplete(Entry entry, Object ctx) {
+                            if (entry == null) {
+                                log.error("[{}-{}] Read an null entry from cursor {}", name, consumer, position);
+                                quorum.succeed();
+                            } else {
+                                try {
+                                    ByteBuf headersAndPayload = entry.getDataBuffer();
+                                    MessageImpl<byte[]> msg = MessageImpl.deserialize(headersAndPayload);
+                                    headersAndPayload.retain();
+                                    msg.setReplicatedFrom("DLQ");
+                                    CompletableFuture<MessageId> future = deadLetterTopicProducer.sendAsync(msg);
+                                    future.whenCompleteAsync((messageId, error) -> {
+                                        if (error != null) {
+                                            log.error("[{}-{}] Fail to send message to dead letter topic {} {} {}",
+                                                    name, consumer, deadLetterTopic, error.getMessage(), error);
+                                            messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+                                            quorum.succeed();
+                                        } else {
+                                            cursor.asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
+                                                @Override
+                                                public void deleteComplete(Object ctx) {
+                                                    entry.release();
+                                                    redeliveryTracker.remove(position);
+                                                    quorum.succeed();
+                                                }
+
+                                                @Override
+                                                public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                                                    entry.release();
+                                                    messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+                                                    quorum.succeed();
+                                                }
+                                            }, position);
+
+                                        }
+                                    });
+                                } catch (Throwable t) {
+                                    log.error("[{}-{}] Failed to deserialize message at {} {} {} {}", name, consumer,
+                                            entry.getPosition(), entry.getLedgerId(), t.getMessage(), t);
+                                    cursor.asyncDelete(position, new AsyncCallbacks.DeleteCallback() {
+                                        @Override
+                                        public void deleteComplete(Object ctx) {
+                                            entry.release();
+                                            redeliveryTracker.remove(position);
+                                            quorum.succeed();
+                                        }
+
+                                        @Override
+                                        public void deleteFailed(ManagedLedgerException exception, Object ctx) {
+                                            entry.release();
+                                            messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+                                            quorum.succeed();
+                                        }
+                                    }, position);
+
+                                }
+                            }
+                        }
+                        @Override
+                        public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                            log.error("[{}-{}] Read entries failed {} {}", name, consumer, exception.getMessage(), exception);
+                            messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+                            quorum.succeed();
+                        }
+                    }, null);
+                });
+                messagesToDeadLetter.clear();
+            }
+        } else {
+            positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
+            readMoreEntries();
+        }
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dac9f055d7..5bfa32ce76 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -27,7 +27,6 @@
 
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
-import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
@@ -39,8 +38,8 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.service.BrokerServiceException;
-import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
@@ -76,12 +75,18 @@
     // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
     private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
 
-    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) {
+    protected int maxRedeliveryCount = 0;
+    protected String deadLetterTopic = null;
+
+    public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
+            int maxRedeliveryCount, String deadLetterTopic) {
         this.topic = topic;
         this.cursor = cursor;
         this.topicName = topic.getName();
         this.subName = subscriptionName;
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor);
+        this.maxRedeliveryCount = maxRedeliveryCount;
+        this.deadLetterTopic = deadLetterTopic;
         IS_FENCED_UPDATER.set(this, FALSE);
     }
 
@@ -112,7 +117,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
                 break;
             case Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Shared) {
-                    dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor);
+                    dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, maxRedeliveryCount, deadLetterTopic);
                 }
                 break;
             case Failover:
@@ -153,7 +158,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
                 close();
                 // when topic closes: it iterates through concurrent-subscription map to close each subscription. so,
                 // topic.remove again try to access same map which creates deadlock. so, execute it in different thread.
-                topic.getBrokerService().pulsar().getExecutor().submit(() ->{
+                topic.getBrokerService().pulsar().getExecutor().submit(() -> {
                     topic.removeSubscription(subName);
                 });
             }
@@ -178,7 +183,7 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
     }
 
     @Override
-    public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties) {
+    public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
         if (ackType == AckType.Cumulative) {
             if (positions.size() != 1) {
                 log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids", topicName, subName);
@@ -195,6 +200,11 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
                 log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
             }
             cursor.asyncDelete(positions, deleteCallback, positions);
+            if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
+                if (((PersistentDispatcherMultipleConsumers) dispatcher).redeliveryTracker != null) {
+                    ((PersistentDispatcherMultipleConsumers) dispatcher).redeliveryTracker.removeBatch(positions);
+                }
+            }
         }
 
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d289c4260..78962ed8ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -55,6 +55,7 @@
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerService;
@@ -240,7 +241,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
                 // to take care of it
             } else {
                 final String subscriptionName = Codec.decode(cursor.getName());
-                subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor));
+                //TODO Persist maxRedeliveryCount and deadLetterTopic to ManagedLedger
+                subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, 0, null));
                 // subscription-cursor gets activated by default: deactivate as there is no active subscription right
                 // now
                 subscriptions.get(subscriptionName).deactivateCursor();
@@ -261,12 +263,13 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
         }
     }
 
-    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor) {
+    private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
+            int maxRedeliveryCount, String deadLetterTopic) {
         checkNotNull(compactedTopic);
         if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
             return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
         } else {
-            return new PersistentSubscription(this, subscriptionName, cursor);
+            return new PersistentSubscription(this, subscriptionName, cursor, maxRedeliveryCount, deadLetterTopic);
         }
     }
 
@@ -457,7 +460,8 @@ public void removeProducer(Producer producer) {
     @Override
     public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
+            int maxRedeliveryCount, String deadLetterTopic, int maxUnackedMessagesPerConsumer) {
 
         final CompletableFuture<Consumer> future = new CompletableFuture<>();
 
@@ -505,10 +509,13 @@ public void removeProducer(Producer producer) {
         }
 
         CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
-                getDurableSubscription(subscriptionName, initialPosition) //
+                getDurableSubscription(subscriptionName, initialPosition, maxRedeliveryCount, deadLetterTopic) //
                 : getNonDurableSubscription(subscriptionName, startMessageId);
 
-        int maxUnackedMessages  = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0;
+        final int maxUackedMessaegesPerConsumerInBroker = brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer();
+        int maxUnackedMessages = isDurable ? (
+            maxUnackedMessagesPerConsumer > 0 && maxUnackedMessagesPerConsumer < maxUackedMessaegesPerConsumerInBroker
+            ? maxUnackedMessagesPerConsumer : maxUackedMessaegesPerConsumerInBroker) : 0;
 
         subscriptionFuture.thenAccept(subscription -> {
             try {
@@ -548,7 +555,8 @@ public void removeProducer(Producer producer) {
         return future;
     }
 
-    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, InitialPosition initialPosition) {
+    private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName, InitialPosition initialPosition,
+           int maxRedeliveryCount, String deadLetterTopic) {
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
         ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, new OpenCursorCallback() {
             @Override
@@ -556,9 +564,28 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
                 }
-
-                subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName,
-                        name -> createPersistentSubscription(subscriptionName, cursor)));
+                subscriptions.computeIfAbsent(subscriptionName,
+                        name -> createPersistentSubscription(subscriptionName, cursor, maxRedeliveryCount, deadLetterTopic));
+
+                PersistentSubscription subscription = subscriptions.get(subscriptionName);
+                if (subscription.maxRedeliveryCount != maxRedeliveryCount) {
+                    subscription.maxRedeliveryCount = maxRedeliveryCount;
+                    if (subscription.dispatcher instanceof PersistentDispatcherMultipleConsumers) {
+                        ((PersistentDispatcherMultipleConsumers) subscription.dispatcher).maxRedeliveryCount = maxRedeliveryCount;
+                    }
+                }
+                if (!StringUtils.equals(subscription.deadLetterTopic, deadLetterTopic)) {
+                    subscription.deadLetterTopic = deadLetterTopic;
+                    if (subscription.dispatcher instanceof PersistentDispatcherMultipleConsumers) {
+                        ((PersistentDispatcherMultipleConsumers) subscription.dispatcher).deadLetterTopic = deadLetterTopic;
+                        try {
+                            ((PersistentDispatcherMultipleConsumers) subscription.dispatcher).reloadDeadLetterProducer();
+                        } catch (Throwable e) {
+                            subscriptionFuture.completeExceptionally(e);
+                        }
+                    }
+                }
+                subscriptionFuture.complete(subscription);
             }
 
             @Override
@@ -602,7 +629,7 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                 subscriptionFuture.completeExceptionally(e);
             }
 
-            return new PersistentSubscription(this, subscriptionName, cursor);
+            return new PersistentSubscription(this, subscriptionName, cursor, 0, null);
         });
 
         if (!subscriptionFuture.isDone()) {
@@ -617,8 +644,9 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
 
     @SuppressWarnings("unchecked")
     @Override
-    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
-        return getDurableSubscription(subscriptionName, initialPosition);
+    public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
+          int maxRedeliveryCount, String deadLetterTopic) {
+        return getDurableSubscription(subscriptionName, initialPosition, maxRedeliveryCount, deadLetterTopic);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/Quorum.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/Quorum.java
new file mode 100644
index 0000000000..8473a873d0
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/Quorum.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+
+/**
+ * Quorum helper. Completes and invokes a callback when the number of {@link #succeed()} or
+ * {@link #fail()} calls equal the expected quorum count.
+ */
+public class Quorum {
+
+    private final long quorum;
+
+    private final Consumer<Boolean> callback;
+
+    private AtomicLong succeeded = new AtomicLong(0);
+
+    private AtomicLong failed = new AtomicLong(0);
+
+    public Quorum(long quorum, Consumer<Boolean> callback) {
+        this.quorum = quorum;
+        this.callback = callback;
+    }
+
+    private void complete(boolean result) {
+        if (callback != null) {
+            callback.accept(result);
+        }
+    }
+
+    /**
+     * Indicates that a call in the quorum succeeded.
+     */
+    public Quorum succeed() {
+        if (succeeded.incrementAndGet() >= quorum) {
+            complete(true);
+        }
+        return this;
+    }
+
+    /**
+     * Indicates that a call in the quorum failed.
+     */
+    public Quorum fail() {
+        if (failed.incrementAndGet() >= quorum) {
+            complete(false);
+        }
+        return this;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
index c1be8d1b2f..17110a849a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java
@@ -257,7 +257,7 @@ private void verifyActiveConsumerChange(CommandActiveConsumerChange change,
     @Test
     public void testConsumerGroupChangesWithOldNewConsumers() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, 0, null);
 
         int partitionIndex = 0;
         PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
@@ -296,7 +296,7 @@ public void testAddRemoveConsumer() throws Exception {
         log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---");
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, 0, null);
 
         int partitionIndex = 0;
         PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock,
@@ -415,7 +415,7 @@ public void testAddRemoveConsumer() throws Exception {
     public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() throws Exception {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
+        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, 0, null);
         Consumer consumer1 = createConsumer(0, 2, false, 1);
         Consumer consumer2 = createConsumer(0, 2, false, 2);
         Consumer consumer3 = createConsumer(0, 2, false, 3);
@@ -459,7 +459,7 @@ public void testMultipleDispatcherGetNextConsumerWithDifferentPriorityLevel() th
     @Test
     public void testFewBlockedConsumerSamePriority() throws Exception{
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
+        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, 0, null);
         Consumer consumer1 = createConsumer(0, 2, false, 1);
         Consumer consumer2 = createConsumer(0, 2, false, 2);
         Consumer consumer3 = createConsumer(0, 2, false, 3);
@@ -486,7 +486,7 @@ public void testFewBlockedConsumerSamePriority() throws Exception{
     @Test
     public void testFewBlockedConsumerDifferentPriority() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
+        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, 0, null);
         Consumer consumer1 = createConsumer(0, 2, false, 1);
         Consumer consumer2 = createConsumer(0, 2, false, 2);
         Consumer consumer3 = createConsumer(0, 2, false, 3);
@@ -540,7 +540,7 @@ public void testFewBlockedConsumerDifferentPriority() throws Exception {
     @Test
     public void testFewBlockedConsumerDifferentPriority2() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock);
+        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursorMock, 0, null);
         Consumer consumer1 = createConsumer(0, 2, true, 1);
         Consumer consumer2 = createConsumer(0, 2, true, 2);
         Consumer consumer3 = createConsumer(0, 2, true, 3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
index 7c5ca2077d..4ac755898f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java
@@ -123,7 +123,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -181,7 +181,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -243,7 +243,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -301,7 +301,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception {
                 .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f8ac40a945..5675a1a3fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -280,7 +280,7 @@ public void testDispatcherMultiConsumerReadFailed() throws Exception {
         PersistentTopic topic = spy(new PersistentTopic(successTopicName, ledgerMock, brokerService));
         ManagedCursor cursor = mock(ManagedCursor.class);
         when(cursor.getName()).thenReturn("cursor");
-        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor);
+        PersistentDispatcherMultipleConsumers dispatcher = new PersistentDispatcherMultipleConsumers(topic, cursor, 0, null);
         dispatcher.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null);
         verify(topic, atLeast(1)).getBrokerService();
     }
@@ -428,7 +428,7 @@ public void testSubscribeFail() throws Exception {
                 .setSubscription("").setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         try {
             f1.get();
             fail("should fail with exception");
@@ -447,12 +447,12 @@ public void testSubscribeUnsubscribe() throws Exception {
 
         // 1. simple subscribe
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         // 2. duplicate subscribe
         Future<Consumer> f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
 
         try {
             f2.get();
@@ -472,7 +472,7 @@ public void testSubscribeUnsubscribe() throws Exception {
     @Test
     public void testAddRemoveConsumer() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, 0, null);
 
         // 1. simple add consumer
         Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
@@ -503,8 +503,8 @@ public void testAddRemoveConsumer() throws Exception {
 
     public void testMaxConsumersShared() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
-        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, 0, null);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, 0, null);
 
         // for count consumers on topic
         ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
@@ -594,8 +594,8 @@ public void testMaxConsumersSharedForNamespace() throws Exception {
     public void testMaxConsumersFailover() throws Exception {
 
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
-        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, 0, null);
+        PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursorMock, 0, null);
 
         // for count consumers on topic
         ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
@@ -685,7 +685,7 @@ public void testMaxConsumersFailoverForNamespace() throws Exception {
     @Test
     public void testUbsubscribeRaceConditions() throws Exception {
         PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
-        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock);
+        PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, 0, null);
         Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */,
                 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
         sub.addConsumer(consumer1);
@@ -739,7 +739,7 @@ public void testDeleteTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         assertTrue(topic.delete().isCompletedExceptionally());
@@ -754,7 +754,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -808,7 +808,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         final CyclicBarrier barrier = new CyclicBarrier(2);
@@ -895,7 +895,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                 .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build();
 
         Future<Consumer> f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(),
-                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest);
+                0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
 
         try {
             f.get();
@@ -1013,7 +1013,7 @@ public void testFailoverSubscription() throws Exception {
         // 1. Subscribe with non partition topic
         Future<Consumer> f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(),
                 cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(),
-                cmd1.getReadCompacted(), InitialPosition.Latest);
+                cmd1.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f1.get();
 
         // 2. Subscribe with partition topic
@@ -1025,7 +1025,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(),
                 cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(),
-                cmd2.getReadCompacted(), InitialPosition.Latest);
+                cmd2.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f2.get();
 
         // 3. Subscribe and create second consumer
@@ -1035,7 +1035,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(),
                 cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(),
-                cmd3.getReadCompacted(), InitialPosition.Latest);
+                cmd3.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f3.get();
 
         assertEquals(
@@ -1056,7 +1056,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(),
                 cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(),
-                cmd4.getReadCompacted(), InitialPosition.Latest);
+                cmd4.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f4.get();
 
         assertEquals(
@@ -1082,7 +1082,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(),
                 cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(),
-                cmd5.getReadCompacted(), InitialPosition.Latest);
+                cmd5.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
 
         try {
             f5.get();
@@ -1099,7 +1099,7 @@ public void testFailoverSubscription() throws Exception {
 
         Future<Consumer> f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(),
                 cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(),
-                cmd6.getReadCompacted(), InitialPosition.Latest);
+                cmd6.getReadCompacted(), InitialPosition.Latest, 0, null, 0);
         f6.get();
 
         // 7. unsubscribe exclusive sub
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 61bdad034a..365f8e2517 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -22,11 +22,7 @@
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -2662,4 +2658,142 @@ public void received(Consumer consumer, Message message)
         assertEquals(latch.getCount(), 1);
         consumer.close();
     }
+
+    @Test
+    public void testDeadLetterTopic() throws Exception {
+
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 100;
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .maxRedeliveryCount(maxRedeliveryCount)
+                .receiverQueueSize(100)
+                .maxUnackedMessagesPerConsumer(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
+    @Test
+    public void testDeadLetterTopicByCustomTopicName() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 100;
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .maxRedeliveryCount(maxRedeliveryCount)
+                .receiverQueueSize(100)
+                .maxUnackedMessagesPerConsumer(100)
+                .deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/QuorumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/QuorumTest.java
new file mode 100644
index 0000000000..ec3413c056
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/QuorumTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+
+import org.junit.Assert;
+import org.testng.annotations.Test;
+
+public class QuorumTest {
+
+    @Test
+    public void testSucceed() {
+        Quorum quorum = new Quorum(3, Assert::assertTrue);
+        for (int i = 0; i < 3; i++) {
+            quorum.succeed();
+        }
+    }
+
+    @Test
+    public void testFail() {
+        Quorum quorum = new Quorum(3, Assert::assertFalse);
+        for (int i = 0; i < 3; i++) {
+            quorum.fail();
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 8657859103..5958bd5def 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -330,4 +330,24 @@
      * Set subscriptionInitialPosition for the consumer
     */
     ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
+
+    /**
+     * Set maximum number of redelivery.
+     * Message exceeding the maximum number of redelivery will send to Dead Letter Topic and acknowledged automatic.
+     * @param maxRedeliveryCount maximum number of redelivery
+     */
+    ConsumerBuilder<T> maxRedeliveryCount(int maxRedeliveryCount);
+
+    /**
+     * Set name of Dead Letter Topic.
+     * Before set name of Dead Letter Topic, ensure that maxRedeliveryCount > 0
+     * @param deadLetterTopic name of dead letter topic
+     */
+    ConsumerBuilder<T> deadLetterTopic(String deadLetterTopic);
+
+    /**
+     * Set max un-acked messages per consumer.
+     * This config should less than broker config, if not, config will not enable. 0 is not limit, default is 0
+     */
+    ConsumerBuilder<T> maxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f0067f7f10..085d0c3fbb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -242,7 +242,25 @@
 		return this;
 	}
 
-	public ConsumerConfigurationData<T> getConf() {
+    @Override
+    public ConsumerBuilder<T> maxRedeliveryCount(int maxRedeliveryCount) {
+        conf.setMaxRedeliveryCount(maxRedeliveryCount);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder<T> deadLetterTopic(String deadLetterTopic) {
+        conf.setDeadLetterTopic(deadLetterTopic);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder<T> maxUnackedMessagesPerConsumer(int maxUnackedMessagesPerConsumer) {
+        conf.setMaxUnackedMessagesPerConsumer(maxUnackedMessagesPerConsumer);
+        return this;
+    }
+
+    public ConsumerConfigurationData<T> getConf() {
 	    return conf;
 	}
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1c69c754a8..18fa97baa2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -131,6 +131,10 @@
     private final SubscriptionInitialPosition subscriptionInitialPosition;
     private final ConnectionHandler connectionHandler;
 
+    private final int maxRedeliveryCount;
+    private final String deadLetterTopic;
+    private final int maxUnackedMessagesPerConsumer;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will retain messages and persist the current
         // position
@@ -159,6 +163,9 @@
         this.priorityLevel = conf.getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
+        this.maxRedeliveryCount = conf.getMaxRedeliveryCount();
+        this.deadLetterTopic = conf.getDeadLetterTopic();
+        this.maxUnackedMessagesPerConsumer = conf.getMaxUnackedMessagesPerConsumer();
 
         TopicName topicName = TopicName.get(topic);
         if (topicName.isPersistent()) {
@@ -486,7 +493,9 @@ public void connectionOpened(final ClientCnx cnx) {
         }
 
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), schema.getSchemaInfo());
+                consumerName, isDurable, startMessageIdData, metadata, readCompacted,
+                InitialPosition.valueOf(subscriptionInitialPosition.getValue()), schema.getSchemaInfo(),
+                maxRedeliveryCount, deadLetterTopic, maxUnackedMessagesPerConsumer);
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 47ede41de6..6751dc0608 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -82,6 +82,12 @@
 
     private int patternAutoDiscoveryPeriod = 1;
 
+    private int maxRedeliveryCount = 0;
+
+    private String deadLetterTopic;
+
+    private int maxUnackedMessagesPerConsumer = 0;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SubscriptionWithDeadLetter.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SubscriptionWithDeadLetter.java
new file mode 100644
index 0000000000..573899f778
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SubscriptionWithDeadLetter.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.tutorial;
+
+import org.apache.pulsar.client.api.*;
+
+import java.util.concurrent.TimeUnit;
+
+public class SubscriptionWithDeadLetter {
+
+    private static final String topic = "persistent://public/default/my-topic";
+
+    private static final int maxRedeliveryCount = 2;
+
+    private static final int sendMessages = 500;
+
+    public static void main(String[] args) throws PulsarClientException {
+
+        PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://127.0.0.1:6650").build();
+
+        Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send("Hello Pulsar!".getBytes());
+        }
+
+        Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .maxRedeliveryCount(maxRedeliveryCount)
+                .receiverQueueSize(100)
+                .maxUnackedMessagesPerConsumer(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = client.newConsumer(Schema.BYTES)
+                .topic("persistent://public/default/my-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> msg = consumer.receive();
+            totalReceived++;
+            System.out.println(new String(msg.getData()));
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message<byte[]> msg = deadLetterConsumer.receive();
+            totalInDeadLetter++;
+            System.out.println(new String(msg.getData()));
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        // test subscribe changed
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send("Hello Pulsar!".getBytes());
+        }
+
+        Consumer<byte[]> consumer2 = client.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .maxRedeliveryCount(maxRedeliveryCount)
+                .receiverQueueSize(100)
+                .maxUnackedMessagesPerConsumer(100)
+                .deadLetterTopic("persistent://public/default/my-topic-my-subscription-custom-DLQ")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer2 = client.newConsumer(Schema.BYTES)
+                .topic("persistent://public/default/my-topic-my-subscription-custom-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived2 = 0;
+        do {
+            Message<byte[]> msg = consumer2.receive();
+            totalReceived2++;
+            System.out.println(new String(msg.getData()));
+        } while (totalReceived2 < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter2 = 0;
+        do {
+            Message<byte[]> msg = deadLetterConsumer2.receive();
+            totalInDeadLetter2++;
+            System.out.println(new String(msg.getData()));
+        } while (totalInDeadLetter2 < sendMessages);
+
+        deadLetterConsumer2.close();
+        consumer2.close();
+        producer.close();
+        client.close();
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b5e2406c9e..0658fe1ce5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -318,12 +318,14 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest, null);
+                true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest,
+                null, 0, null, 0);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
+            Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
+            SchemaInfo schemaInfo, int maxRedeliveryCount, String deadLetterTopic, int maxUnackedMessagesPerConsumer) {
         CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -335,6 +337,11 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu
         subscribeBuilder.setDurable(isDurable);
         subscribeBuilder.setReadCompacted(readCompacted);
         subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
+        subscribeBuilder.setMaxRedeliveryCount(maxRedeliveryCount);
+        subscribeBuilder.setMaxUnackedMessagePerConsumer(maxUnackedMessagesPerConsumer);
+        if (deadLetterTopic != null) {
+            subscribeBuilder.setDeadLetterTopic(deadLetterTopic);
+        }
         if (startMessageId != null) {
             subscribeBuilder.setStartMessageId(startMessageId);
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index edc7e9b118..720d4da6d5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -6722,6 +6722,18 @@ public Builder clearProtocolVersion() {
     // optional .pulsar.proto.CommandSubscribe.InitialPosition initialPosition = 13 [default = Latest];
     boolean hasInitialPosition();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition getInitialPosition();
+    
+    // optional int32 maxRedeliveryCount = 14;
+    boolean hasMaxRedeliveryCount();
+    int getMaxRedeliveryCount();
+    
+    // optional string deadLetterTopic = 15;
+    boolean hasDeadLetterTopic();
+    String getDeadLetterTopic();
+    
+    // optional int32 maxUnackedMessagePerConsumer = 16;
+    boolean hasMaxUnackedMessagePerConsumer();
+    int getMaxUnackedMessagePerConsumer();
   }
   public static final class CommandSubscribe extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -7050,6 +7062,58 @@ public boolean hasInitialPosition() {
       return initialPosition_;
     }
     
+    // optional int32 maxRedeliveryCount = 14;
+    public static final int MAXREDELIVERYCOUNT_FIELD_NUMBER = 14;
+    private int maxRedeliveryCount_;
+    public boolean hasMaxRedeliveryCount() {
+      return ((bitField0_ & 0x00001000) == 0x00001000);
+    }
+    public int getMaxRedeliveryCount() {
+      return maxRedeliveryCount_;
+    }
+    
+    // optional string deadLetterTopic = 15;
+    public static final int DEADLETTERTOPIC_FIELD_NUMBER = 15;
+    private java.lang.Object deadLetterTopic_;
+    public boolean hasDeadLetterTopic() {
+      return ((bitField0_ & 0x00002000) == 0x00002000);
+    }
+    public String getDeadLetterTopic() {
+      java.lang.Object ref = deadLetterTopic_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = 
+            (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) {
+          deadLetterTopic_ = s;
+        }
+        return s;
+      }
+    }
+    private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getDeadLetterTopicBytes() {
+      java.lang.Object ref = deadLetterTopic_;
+      if (ref instanceof String) {
+        org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = 
+            org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref);
+        deadLetterTopic_ = b;
+        return b;
+      } else {
+        return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref;
+      }
+    }
+    
+    // optional int32 maxUnackedMessagePerConsumer = 16;
+    public static final int MAXUNACKEDMESSAGEPERCONSUMER_FIELD_NUMBER = 16;
+    private int maxUnackedMessagePerConsumer_;
+    public boolean hasMaxUnackedMessagePerConsumer() {
+      return ((bitField0_ & 0x00004000) == 0x00004000);
+    }
+    public int getMaxUnackedMessagePerConsumer() {
+      return maxUnackedMessagePerConsumer_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -7064,6 +7128,9 @@ private void initFields() {
       readCompacted_ = false;
       schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
       initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
+      maxRedeliveryCount_ = 0;
+      deadLetterTopic_ = "";
+      maxUnackedMessagePerConsumer_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -7159,6 +7226,15 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000800) == 0x00000800)) {
         output.writeEnum(13, initialPosition_.getNumber());
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        output.writeInt32(14, maxRedeliveryCount_);
+      }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        output.writeBytes(15, getDeadLetterTopicBytes());
+      }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        output.writeInt32(16, maxUnackedMessagePerConsumer_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -7219,6 +7295,18 @@ public int getSerializedSize() {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeEnumSize(13, initialPosition_.getNumber());
       }
+      if (((bitField0_ & 0x00001000) == 0x00001000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(14, maxRedeliveryCount_);
+      }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBytesSize(15, getDeadLetterTopicBytes());
+      }
+      if (((bitField0_ & 0x00004000) == 0x00004000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeInt32Size(16, maxUnackedMessagePerConsumer_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -7358,6 +7446,12 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000800);
         initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
         bitField0_ = (bitField0_ & ~0x00001000);
+        maxRedeliveryCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x00002000);
+        deadLetterTopic_ = "";
+        bitField0_ = (bitField0_ & ~0x00004000);
+        maxUnackedMessagePerConsumer_ = 0;
+        bitField0_ = (bitField0_ & ~0x00008000);
         return this;
       }
       
@@ -7444,6 +7538,18 @@ public Builder clone() {
           to_bitField0_ |= 0x00000800;
         }
         result.initialPosition_ = initialPosition_;
+        if (((from_bitField0_ & 0x00002000) == 0x00002000)) {
+          to_bitField0_ |= 0x00001000;
+        }
+        result.maxRedeliveryCount_ = maxRedeliveryCount_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00002000;
+        }
+        result.deadLetterTopic_ = deadLetterTopic_;
+        if (((from_bitField0_ & 0x00008000) == 0x00008000)) {
+          to_bitField0_ |= 0x00004000;
+        }
+        result.maxUnackedMessagePerConsumer_ = maxUnackedMessagePerConsumer_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -7496,6 +7602,15 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub
         if (other.hasInitialPosition()) {
           setInitialPosition(other.getInitialPosition());
         }
+        if (other.hasMaxRedeliveryCount()) {
+          setMaxRedeliveryCount(other.getMaxRedeliveryCount());
+        }
+        if (other.hasDeadLetterTopic()) {
+          setDeadLetterTopic(other.getDeadLetterTopic());
+        }
+        if (other.hasMaxUnackedMessagePerConsumer()) {
+          setMaxUnackedMessagePerConsumer(other.getMaxUnackedMessagePerConsumer());
+        }
         return this;
       }
       
@@ -7647,6 +7762,21 @@ public Builder mergeFrom(
               }
               break;
             }
+            case 112: {
+              bitField0_ |= 0x00002000;
+              maxRedeliveryCount_ = input.readInt32();
+              break;
+            }
+            case 122: {
+              bitField0_ |= 0x00004000;
+              deadLetterTopic_ = input.readBytes();
+              break;
+            }
+            case 128: {
+              bitField0_ |= 0x00008000;
+              maxUnackedMessagePerConsumer_ = input.readInt32();
+              break;
+            }
           }
         }
       }
@@ -8089,6 +8219,84 @@ public Builder clearInitialPosition() {
         return this;
       }
       
+      // optional int32 maxRedeliveryCount = 14;
+      private int maxRedeliveryCount_ ;
+      public boolean hasMaxRedeliveryCount() {
+        return ((bitField0_ & 0x00002000) == 0x00002000);
+      }
+      public int getMaxRedeliveryCount() {
+        return maxRedeliveryCount_;
+      }
+      public Builder setMaxRedeliveryCount(int value) {
+        bitField0_ |= 0x00002000;
+        maxRedeliveryCount_ = value;
+        
+        return this;
+      }
+      public Builder clearMaxRedeliveryCount() {
+        bitField0_ = (bitField0_ & ~0x00002000);
+        maxRedeliveryCount_ = 0;
+        
+        return this;
+      }
+      
+      // optional string deadLetterTopic = 15;
+      private java.lang.Object deadLetterTopic_ = "";
+      public boolean hasDeadLetterTopic() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      public String getDeadLetterTopic() {
+        java.lang.Object ref = deadLetterTopic_;
+        if (!(ref instanceof String)) {
+          String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8();
+          deadLetterTopic_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setDeadLetterTopic(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00004000;
+        deadLetterTopic_ = value;
+        
+        return this;
+      }
+      public Builder clearDeadLetterTopic() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        deadLetterTopic_ = getDefaultInstance().getDeadLetterTopic();
+        
+        return this;
+      }
+      void setDeadLetterTopic(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) {
+        bitField0_ |= 0x00004000;
+        deadLetterTopic_ = value;
+        
+      }
+      
+      // optional int32 maxUnackedMessagePerConsumer = 16;
+      private int maxUnackedMessagePerConsumer_ ;
+      public boolean hasMaxUnackedMessagePerConsumer() {
+        return ((bitField0_ & 0x00008000) == 0x00008000);
+      }
+      public int getMaxUnackedMessagePerConsumer() {
+        return maxUnackedMessagePerConsumer_;
+      }
+      public Builder setMaxUnackedMessagePerConsumer(int value) {
+        bitField0_ |= 0x00008000;
+        maxUnackedMessagePerConsumer_ = value;
+        
+        return this;
+      }
+      public Builder clearMaxUnackedMessagePerConsumer() {
+        bitField0_ = (bitField0_ & ~0x00008000);
+        maxUnackedMessagePerConsumer_ = 0;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 779a27bc1f..7eea11ad56 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -229,6 +229,16 @@ message CommandSubscribe {
 	// Signal wthether the subscription will initialize on latest
 	// or not -- earliest
 	optional InitialPosition initialPosition = 13 [default = Latest];
+	// Maximum number of redeliveries.
+	// Message exceeding the maximum number of redeliveries should send to Dead Letter Topic and acknowledged automatic.
+	// Enable this feature by set maxRedeliveryCount > 0
+	optional int32 maxRedeliveryCount = 14;
+	// Name of Dead Letter Topic.
+	// If not set, pulsar broker will generate with topic name and subscription name and suffix with -DLQ
+	optional string deadLetterTopic = 15;
+	//Maximum number of un-acked messages count per consumer.
+	//This config should less than broker config, if not, config will not enable.
+	optional int32 maxUnackedMessagePerConsumer = 16;
 }
 
 message CommandPartitionedTopicMetadata {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services