You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/14 07:38:03 UTC

[incubator-pulsar] branch master updated: PIP-22: Dead Letter Topic (#2508)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95fe84c  PIP-22: Dead Letter Topic (#2508)
95fe84c is described below

commit 95fe84c7b43060102bb08f0dfe85dadb152f4542
Author: penghui <co...@gmail.com>
AuthorDate: Fri Sep 14 15:37:58 2018 +0800

    PIP-22: Dead Letter Topic (#2508)
    
    ### Motivation
    
    Fixes #189
    
    When consumer got messages from pulsar, It's difficult to ensure every message can be consume success. Pulsar support message redelivery feature by set acknowledge timeout when create a new consumer. This is a good feature guarantee consumer will not lost messages.
    
    But however, some message will redelivery so many times possible, even to the extent that it can be never stop.
    
    So, It's necessary to support a feature to control it by pulsar. Users can use this feature and customize this feature to control the message redelivery behavior. The feature named Dead Letter Topic.
    
    ### Modifications
    
    Consumer can set maximum number of redeliveries by java client.
    Consumer can set the name of Dead Letter Topic by java client, It’s not necessary.
    Message exceeding the maximum number of redeliveries should send to Dead Letter Topic and acknowledged automatic.
    
    ### Result
    
    If consumer enable future of dead letter topic. When Message exceeding the maximum number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic.
---
 .../org/apache/pulsar/broker/service/Consumer.java |   3 +-
 .../apache/pulsar/broker/service/Dispatcher.java   |   2 +
 .../broker/service/InMemoryRedeliveryTracker.java  |  58 +++++
 .../pulsar/broker/service/RedeliveryTracker.java   |  36 +++
 .../broker/service/RedeliveryTrackerDisabled.java  |  55 +++++
 .../NonPersistentDispatcherMultipleConsumers.java  |   9 +
 ...onPersistentDispatcherSingleActiveConsumer.java |   9 +
 .../PersistentDispatcherMultipleConsumers.java     |  14 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  11 +
 .../service/persistent/PersistentSubscription.java |   1 +
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   2 +-
 .../pulsar/client/api/DeadLetterTopicTest.java     | 249 +++++++++++++++++++++
 .../client/impl/CompactedOutBatchMessageTest.java  |   2 +-
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  24 ++
 .../apache/pulsar/client/api/DeadLetterPolicy.java |  32 +++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   7 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 111 ++++++++-
 .../impl/conf/ConsumerConfigurationData.java       |   3 +
 .../org/apache/pulsar/common/api/Commands.java     |   5 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  57 +++++
 pulsar-common/src/main/proto/PulsarApi.proto       |   1 +
 22 files changed, 680 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 883bf55..5752034 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -265,7 +265,8 @@ public class Consumer {
                 if (i == (entries.size() - 1)) {
                     promise = writePromise;
                 }
-                ctx.write(Commands.newMessage(consumerId, messageId, metadataAndPayload), promise);
+                int redeliveryCount = subscription.getDispatcher().getRedeliveryTracker().getRedeliveryCount(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()));
+                ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload), promise);
                 messageId.recycle();
                 messageIdBuilder.recycle();
                 entry.release();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index 0d7b7d6..43f65cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -70,4 +70,6 @@ public interface Dispatcher {
 
     void addUnAckedMessages(int unAckMessages);
 
+    RedeliveryTracker getRedeliveryTracker();
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
new file mode 100644
index 0000000..99b38cc
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryRedeliveryTracker.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InMemoryRedeliveryTracker implements RedeliveryTracker {
+
+    private ConcurrentHashMap<Position, AtomicInteger> trackerCache = new ConcurrentHashMap<>(16);
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position) {
+        trackerCache.putIfAbsent(position, new AtomicInteger(0));
+        return trackerCache.get(position).incrementAndGet();
+    }
+
+    @Override
+    public int getRedeliveryCount(Position position) {
+        return trackerCache.getOrDefault(position, new AtomicInteger(0)).get();
+    }
+
+    @Override
+    public void remove(Position position) {
+        trackerCache.remove(position);
+    }
+
+    @Override
+    public void removeBatch(List<Position> positions) {
+        if (positions != null) {
+            positions.forEach(this::remove);
+        }
+    }
+
+    @Override
+    public void clear() {
+        trackerCache.clear();
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
new file mode 100644
index 0000000..0f2e54a
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+
+public interface RedeliveryTracker {
+
+    int incrementAndGetRedeliveryCount(Position position);
+
+    int getRedeliveryCount(Position position);
+
+    void remove(Position position);
+
+    void removeBatch(List<Position> positions);
+
+    void clear();
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
new file mode 100644
index 0000000..521417f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTrackerDisabled.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.bookkeeper.mledger.Position;
+
+import java.util.List;
+
+public class RedeliveryTrackerDisabled implements RedeliveryTracker {
+
+    public static final RedeliveryTrackerDisabled REDELIVERY_TRACKER_DISABLED = new RedeliveryTrackerDisabled();
+
+    private RedeliveryTrackerDisabled() {}
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position) {
+        return 0;
+    }
+
+    @Override
+    public int getRedeliveryCount(Position position) {
+        return 0;
+    }
+
+    @Override
+    public void remove(Position position) {
+        // no-op
+    }
+
+    @Override
+    public void removeBatch(List<Position> positions) {
+        // no-op
+    }
+
+    @Override
+    public void clear() {
+        // no-op
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index f40564b..2067a80 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -31,6 +31,8 @@ import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
@@ -54,6 +56,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
     private volatile int totalAvailablePermits = 0;
 
     private final ServiceConfiguration serviceConfig;
+    private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
         this.topic = topic;
@@ -61,6 +64,7 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
         this.name = topic.getName() + " / " + subscription.getName();
         this.msgDrop = new Rate();
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
+        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
     @Override
@@ -179,6 +183,11 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
     }
 
     @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
+    @Override
     public void sendMessages(List<Entry> entries) {
         Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null;
         if (consumer != null) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 2083cc7..787fb00 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -29,6 +29,8 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -40,6 +42,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     private final Rate msgDrop;
     private final Subscription subscription;
     private final ServiceConfiguration serviceConfig;
+    private final RedeliveryTracker redeliveryTracker;
 
     public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
             NonPersistentTopic topic, Subscription subscription) {
@@ -48,6 +51,7 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
         this.subscription = subscription;
         this.msgDrop = new Rate();
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
+        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
     @Override
@@ -118,6 +122,11 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
     }
 
     @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
+    @Override
     protected void scheduleReadOnActiveConsumer() {
         // No-op
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 17c5db7..5198a13 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -44,6 +44,8 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyExcep
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Consumer.SendMessageInfo;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.InMemoryRedeliveryTracker;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -69,6 +71,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
 
     private CompletableFuture<Void> closeFuture = null;
     private ConcurrentLongPairSet messagesToReplay;
+    private final RedeliveryTracker redeliveryTracker;
 
     private boolean havePendingRead = false;
     private boolean havePendingReplayRead = false;
@@ -97,6 +100,7 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
         this.topic = topic;
         this.messagesToReplay = new ConcurrentLongPairSet(512, 2);
+        this.redeliveryTracker = new InMemoryRedeliveryTracker();
         this.readBatchSize = MaxReadBatchSize;
         this.maxUnackedMessages = topic.getBrokerService().pulsar().getConfiguration()
                 .getMaxUnackedMessagesPerSubscription();
@@ -556,7 +560,10 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
-        positions.forEach(position -> messagesToReplay.add(position.getLedgerId(), position.getEntryId()));
+        positions.forEach(position -> {
+            messagesToReplay.add(position.getLedgerId(), position.getEntryId());
+            redeliveryTracker.incrementAndGetRedeliveryCount(position);
+        });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions);
         }
@@ -624,5 +631,10 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         return dispatchRateLimiter;
     }
 
+    @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 9ab7b87..89dfb47 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -39,6 +39,8 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.RedeliveryTracker;
+import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -62,6 +64,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     private final ServiceConfiguration serviceConfig;
     private ScheduledFuture<?> readOnActiveConsumerTask = null;
 
+    private final RedeliveryTracker redeliveryTracker;
+
     public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
             PersistentTopic topic) {
         super(subscriptionType, partitionIndex, topic.getName());
@@ -72,6 +76,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         this.readBatchSize = MaxReadBatchSize;
         this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
         this.dispatchRateLimiter = null;
+        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
     }
 
     protected void scheduleReadOnActiveConsumer() {
@@ -307,6 +312,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
     @Override
     public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
         // We cannot redeliver single messages to single consumers to preserve ordering.
+        positions.forEach(redeliveryTracker::incrementAndGetRedeliveryCount);
         redeliverUnacknowledgedMessages(consumer);
     }
 
@@ -485,5 +491,10 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
         // No-op
     }
 
+    @Override
+    public RedeliveryTracker getRedeliveryTracker() {
+        return redeliveryTracker;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index dac9f05..669c6d4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -195,6 +195,7 @@ public class PersistentSubscription implements Subscription {
                 log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
             }
             cursor.asyncDelete(positions, deleteCallback, positions);
+            dispatcher.getRedeliveryTracker().removeBatch(positions);
         }
 
         if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 99aec93..bff1043 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -178,7 +178,7 @@ public class RawReaderImpl implements RawReader {
         }
 
         @Override
-        void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) {
+        void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) {
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received raw message: {}/{}/{}", topic, subscription,
                           messageId.getEntryId(), messageId.getLedgerId(), messageId.getPartition());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
new file mode 100644
index 0000000..114152a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNull;
+
+public class DeadLetterTopicTest extends ProducerConsumerBase {
+
+    private static final Logger log = LoggerFactory.getLogger(DeadLetterTopicTest.class);
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testDeadLetterTopic() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+
+        final int maxRedeliveryCount = 2;
+
+        final int sendMessages = 100;
+
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        producer.close();
+
+
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
+    @Test
+    public void testDeadLetterTopicWithMultiTopic() throws Exception {
+        final String topic1 = "persistent://my-property/my-ns/dead-letter-topic-1";
+        final String topic2 = "persistent://my-property/my-ns/dead-letter-topic-2";
+
+        final int maxRedeliveryCount = 2;
+
+        int sendMessages = 100;
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic1)
+                .create();
+
+        Producer<byte[]> producer2 = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic2)
+                .create();
+
+        for (int i = 0; i < sendMessages; i++) {
+            producer1.send(String.format("Hello Pulsar [%d]", i).getBytes());
+            producer2.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+
+        sendMessages = sendMessages * 2;
+
+        producer1.close();
+        producer2.close();
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic1, topic2)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build())
+                .receiverQueueSize(100)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-topic-1-my-subscription-DLQ", "persistent://my-property/my-ns/dead-letter-topic-2-my-subscription-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+
+        deadLetterConsumer.close();
+        consumer.close();
+
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic1, topic2)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+
+        checkConsumer.close();
+    }
+
+    @Test
+    public void testDeadLetterTopicByCustomTopicName() throws Exception {
+        final String topic = "persistent://my-property/my-ns/dead-letter-topic";
+        final int maxRedeliveryCount = 2;
+        final int sendMessages = 100;
+        Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
+                .topic(topic)
+                .create();
+        for (int i = 0; i < sendMessages; i++) {
+            producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
+        }
+        producer.close();
+        Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .receiverQueueSize(100)
+                .deadLetterPolicy(DeadLetterPolicy.builder()
+                        .maxRedeliverCount(maxRedeliveryCount)
+                        .deadLetterTopic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                        .build())
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic("persistent://my-property/my-ns/dead-letter-custom-topic-my-subscription-custom-DLQ")
+                .subscriptionName("my-subscription")
+                .subscribe();
+        int totalReceived = 0;
+        do {
+            Message<byte[]> message = consumer.receive();
+            log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            totalReceived++;
+        } while (totalReceived < sendMessages * (maxRedeliveryCount + 1));
+        int totalInDeadLetter = 0;
+        do {
+            Message message = deadLetterConsumer.receive();
+            log.info("dead letter consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
+            deadLetterConsumer.acknowledge(message);
+            totalInDeadLetter++;
+        } while (totalInDeadLetter < sendMessages);
+        deadLetterConsumer.close();
+        consumer.close();
+        Consumer<byte[]> checkConsumer = pulsarClient.newConsumer(Schema.BYTES)
+                .topic(topic)
+                .subscriptionName("my-subscription")
+                .subscriptionType(SubscriptionType.Shared)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+        Message<byte[]> checkMessage = checkConsumer.receive(3, TimeUnit.SECONDS);
+        if (checkMessage != null) {
+            log.info("check consumer received message : {} {}", checkMessage.getMessageId(), new String(checkMessage.getData()));
+        }
+        assertNull(checkMessage);
+        checkConsumer.close();
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
index 85dbb24..de1b88a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -73,7 +73,7 @@ public class CompactedOutBatchMessageTest extends ProducerConsumerBase {
              = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
                 .subscriptionName("my-subscriber-name").subscribe()) {
             // shove it in the sideways
-            consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer,
+            consumer.receiveIndividualMessagesFromBatch(metadata, 0, batchBuffer,
                                                         MessageIdData.newBuilder().setLedgerId(1234)
                                                         .setEntryId(567).build(), consumer.cnx());
             Message<?> m = consumer.receive();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index da1cbb5..d941758 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -338,4 +338,28 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * @return consumer builder.
      */
     ConsumerBuilder<T> intercept(ConsumerInterceptor<T> ...interceptors);
+
+    /**
+     * Set dead letter policy for consumer
+     *
+     * By default some message will redelivery so many times possible, even to the extent that it can be never stop.
+     * By using dead letter mechanism messages will has the max redelivery count, when message exceeding the maximum
+     * number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic.
+     *
+     * You can enable the dead letter mechanism by setting dead letter policy.
+     * example:
+     * <pre>
+     * client.newConsumer()
+     *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
+     *          .subscribe();
+     * </pre>
+     * Default dead letter topic name is {TopicName}-{Subscription}-DLQ.
+     * To setting a custom dead letter topic name
+     * <pre>
+     * client.newConsumer()
+     *          .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build())
+     *          .subscribe();
+     * </pre>
+     */
+    ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
new file mode 100644
index 0000000..52a2a23
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/DeadLetterPolicy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Builder
+@Data
+public class DeadLetterPolicy {
+
+    private int maxRedeliverCount;
+
+    private String deadLetterTopic;
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7287194..9306a82 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -284,7 +284,7 @@ public class ClientCnx extends PulsarHandler {
         }
         ConsumerImpl<?> consumer = consumers.get(cmdMessage.getConsumerId());
         if (consumer != null) {
-            consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this);
+            consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), headersAndPayload, this);
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 2095bab..103bb5e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.util.FutureUtil;
 
 import com.google.common.collect.Lists;
@@ -256,6 +257,12 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
+        conf.setDeadLetterPolicy(deadLetterPolicy);
+        return this;
+    }
+
     public ConsumerConfigurationData<T> getConf() {
 	    return conf;
 	}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a0a2319..b7f9918 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,14 +27,12 @@ import static org.apache.pulsar.common.api.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import static java.util.Base64.getEncoder;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +41,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
@@ -54,11 +53,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -81,6 +82,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,6 +135,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private final String topicNameWithoutPartition;
 
+    private ConcurrentHashMap<MessageIdImpl, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
+
+    private DeadLetterPolicy deadLetterPolicy;
+
+    private Producer<T> deadLetterProducer;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will retain messages and persist the current
         // position
@@ -205,6 +213,21 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 NonPersistentAcknowledgmentGroupingTracker.of();
         }
 
+        if (conf.getDeadLetterPolicy() != null) {
+            possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>();
+            if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
+                this.deadLetterPolicy = DeadLetterPolicy.builder()
+                        .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
+                        .deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic())
+                        .build();
+            } else {
+                this.deadLetterPolicy = DeadLetterPolicy.builder()
+                        .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
+                        .deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription))
+                        .build();
+            }
+        }
+
         topicNameWithoutPartition = topicName.getPartitionedTopicName();
 
         grabCnx();
@@ -233,6 +256,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
                 cnx.removeConsumer(consumerId);
                 unAckedMessageTracker.close();
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.clear();
+                }
                 client.cleanupConsumer(ConsumerImpl.this);
                 log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
                 unsubscribeFuture.complete(null);
@@ -448,9 +474,16 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 stats.incrementNumAcksSent(batchMessageId.getBatchSize());
                 unAckedMessageTracker.remove(new MessageIdImpl(batchMessageId.getLedgerId(),
                         batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.remove(new MessageIdImpl(batchMessageId.getLedgerId(),
+                            batchMessageId.getEntryId(), batchMessageId.getPartitionIndex()));
+                }
             } else {
                 // increment counter by 1 for non-batch msg
                 unAckedMessageTracker.remove(msgId);
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.remove(msgId);
+                }
                 stats.incrementNumAcksSent(1);
             }
             onAcknowledge(messageId, null);
@@ -479,7 +512,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         synchronized (this) {
             currentSize = incomingMessages.size();
             startMessageId = clearReceiverQueue();
-            unAckedMessageTracker.clear();
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.clear();
+            }
         }
 
         boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
@@ -623,6 +658,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     public CompletableFuture<Void> closeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             unAckedMessageTracker.close();
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.clear();
+            }
             return CompletableFuture.completedFuture(null);
         }
 
@@ -630,6 +668,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription);
             setState(State.Closed);
             unAckedMessageTracker.close();
+            if (possibleSendToDeadLetterTopicMessages != null) {
+                possibleSendToDeadLetterTopicMessages.clear();
+            }
             client.cleanupConsumer(this);
             return CompletableFuture.completedFuture(null);
         }
@@ -651,6 +692,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 log.info("[{}] [{}] Closed consumer", topic, subscription);
                 setState(State.Closed);
                 unAckedMessageTracker.close();
+                if (possibleSendToDeadLetterTopicMessages != null) {
+                    possibleSendToDeadLetterTopicMessages.clear();
+                }
                 closeFuture.complete(null);
                 client.cleanupConsumer(this);
                 // fail all pending-receive futures to notify application
@@ -697,7 +741,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         });
     }
 
-    void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientCnx cnx) {
+    void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf headersAndPayload, ClientCnx cnx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
                     messageId.getEntryId());
@@ -756,6 +800,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
             final MessageImpl<T> message = new MessageImpl<>(msgId, msgMetadata, uncompressedPayload,
                     createEncryptionContext(msgMetadata), cnx, schema);
+
             uncompressedPayload.release();
             msgMetadata.recycle();
 
@@ -765,6 +810,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
                 // if asyncReceive is waiting then notify callback without adding to incomingMessages queue
                 unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
+                if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
+                }
                 if (!pendingReceives.isEmpty()) {
                     notifyPendingReceivedCallback(message, null);
                 } else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) {
@@ -791,7 +839,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 });
             } else {
                 // handle batch message enqueuing; uncompressed payload has all messages in batch
-                receiveIndividualMessagesFromBatch(msgMetadata, uncompressedPayload, messageId, cnx);
+                receiveIndividualMessagesFromBatch(msgMetadata, redeliveryCount, uncompressedPayload, messageId, cnx);
             }
             uncompressedPayload.release();
             msgMetadata.recycle();
@@ -881,7 +929,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         });
     }
 
-    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf uncompressedPayload,
+    void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliveryCount, ByteBuf uncompressedPayload,
             MessageIdData messageId, ClientCnx cnx) {
         int batchSize = msgMetadata.getNumMessagesInBatch();
 
@@ -890,7 +938,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 getPartitionIndex());
         BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
         unAckedMessageTracker.add(batchMessage);
-
+        List<MessageImpl<T>> possibleToDeadLetter = null;
+        if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
+            possibleToDeadLetter = new ArrayList<>();
+        }
         int skippedMessages = 0;
         try {
             for (int i = 0; i < batchSize; ++i) {
@@ -930,6 +981,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 final MessageImpl<T> message = new MessageImpl<>(batchMessageIdImpl, msgMetadata,
                         singleMessageMetadataBuilder.build(), singleMessagePayload,
                         createEncryptionContext(msgMetadata), cnx, schema);
+                if (possibleToDeadLetter != null) {
+                    possibleToDeadLetter.add(message);
+                }
                 lock.readLock().lock();
                 try {
                     if (pendingReceives.isEmpty()) {
@@ -947,6 +1001,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName);
             discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
         }
+        if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) {
+            possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter);
+        }
+
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", subscription,
                     consumerName, incomingMessages.size(), incomingMessages.remainingCapacity());
@@ -1184,6 +1242,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             MessageIdData.Builder builder = MessageIdData.newBuilder();
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = ids.stream().map(messageId -> {
+                    // process message possible to dead letter topic
+                    processPossibleToDLQ(messageId);
                     // attempt to remove message from batchMessageAckTracker
                     builder.setPartition(messageId.getPartitionIndex());
                     builder.setLedgerId(messageId.getLedgerId());
@@ -1212,6 +1272,43 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         }
     }
 
+    private void processPossibleToDLQ(MessageIdImpl messageId) {
+        List<MessageImpl<T>> deadLetterMessages = null;
+        if (possibleSendToDeadLetterTopicMessages != null) {
+            if (messageId instanceof BatchMessageIdImpl) {
+                deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                        getPartitionIndex()));
+            } else {
+                deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId);
+            }
+        }
+        if (deadLetterMessages != null) {
+            if (deadLetterProducer == null) {
+                try {
+                    deadLetterProducer = client.newProducer(schema)
+                            .topic(this.deadLetterPolicy.getDeadLetterTopic())
+                            .blockIfQueueFull(false)
+                            .create();
+                } catch (Exception e) {
+                    log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
+                }
+            }
+            if (deadLetterProducer != null) {
+                try {
+                    for (MessageImpl<T> message : deadLetterMessages) {
+                        deadLetterProducer.newMessage()
+                                .value(message.getValue())
+                                .properties(message.getProperties())
+                                .send();
+                    }
+                    acknowledge(messageId);
+                } catch (Exception e) {
+                    log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
+                }
+            }
+        }
+    }
+
     @Override
     public void seek(MessageId messageId) throws PulsarClientException {
         try {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 47ede41..a0fd493 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 
 @Data
 public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
@@ -82,6 +83,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
 
     private int patternAutoDiscoveryPeriod = 1;
 
+    private DeadLetterPolicy deadLetterPolicy;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b5e2406..c94482d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -282,10 +282,13 @@ public class Commands {
         }
     }
 
-    public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, ByteBuf metadataAndPayload) {
+    public static ByteBufPair newMessage(long consumerId, MessageIdData messageId, int redeliveryCount, ByteBuf metadataAndPayload) {
         CommandMessage.Builder msgBuilder = CommandMessage.newBuilder();
         msgBuilder.setConsumerId(consumerId);
         msgBuilder.setMessageId(messageId);
+        if (redeliveryCount > 0) {
+            msgBuilder.setRedeliveryCount(redeliveryCount);
+        }
         CommandMessage msg = msgBuilder.build();
         BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
         BaseCommand cmd = cmdBuilder.setType(Type.MESSAGE).setMessage(msg).build();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index edc7e9b..c6ff6c1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -13542,6 +13542,10 @@ public final class PulsarApi {
     // required .pulsar.proto.MessageIdData message_id = 2;
     boolean hasMessageId();
     org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData getMessageId();
+    
+    // optional uint32 redelivery_count = 3 [default = 0];
+    boolean hasRedeliveryCount();
+    int getRedeliveryCount();
   }
   public static final class CommandMessage extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -13598,9 +13602,20 @@ public final class PulsarApi {
       return messageId_;
     }
     
+    // optional uint32 redelivery_count = 3 [default = 0];
+    public static final int REDELIVERY_COUNT_FIELD_NUMBER = 3;
+    private int redeliveryCount_;
+    public boolean hasRedeliveryCount() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public int getRedeliveryCount() {
+      return redeliveryCount_;
+    }
+    
     private void initFields() {
       consumerId_ = 0L;
       messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
+      redeliveryCount_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -13637,6 +13652,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeMessage(2, messageId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeUInt32(3, redeliveryCount_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -13653,6 +13671,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(2, messageId_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeUInt32Size(3, redeliveryCount_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -13770,6 +13792,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000001);
         messageId_ = org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x00000002);
+        redeliveryCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -13811,6 +13835,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000002;
         }
         result.messageId_ = messageId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.redeliveryCount_ = redeliveryCount_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -13823,6 +13851,9 @@ public final class PulsarApi {
         if (other.hasMessageId()) {
           mergeMessageId(other.getMessageId());
         }
+        if (other.hasRedeliveryCount()) {
+          setRedeliveryCount(other.getRedeliveryCount());
+        }
         return this;
       }
       
@@ -13879,6 +13910,11 @@ public final class PulsarApi {
               subBuilder.recycle();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              redeliveryCount_ = input.readUInt32();
+              break;
+            }
           }
         }
       }
@@ -13949,6 +13985,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional uint32 redelivery_count = 3 [default = 0];
+      private int redeliveryCount_ ;
+      public boolean hasRedeliveryCount() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getRedeliveryCount() {
+        return redeliveryCount_;
+      }
+      public Builder setRedeliveryCount(int value) {
+        bitField0_ |= 0x00000004;
+        redeliveryCount_ = value;
+        
+        return this;
+      }
+      public Builder clearRedeliveryCount() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        redeliveryCount_ = 0;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandMessage)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 779a27b..c50f072 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -335,6 +335,7 @@ message CommandSendError {
 message CommandMessage {
 	required uint64 consumer_id       = 1;
 	required MessageIdData message_id = 2;
+	optional uint32 redelivery_count  = 3 [default = 0];
 }
 
 message CommandAck {