You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/05 21:25:33 UTC

[incubator-pulsar] branch master updated: PIP-23: Pulsar Java Client Interceptors. (#2471)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a92111  PIP-23: Pulsar Java Client Interceptors. (#2471)
7a92111 is described below

commit 7a92111fe7824d422d1739abd06d85955b652212
Author: penghui <co...@gmail.com>
AuthorDate: Thu Sep 6 05:25:30 2018 +0800

    PIP-23: Pulsar Java Client Interceptors. (#2471)
    
    ### Motivation
    
    Support user to add interceptors to producer and consumer.
    
    ### Modifications
    
    Add Consumer interceptors.
    ```java
    Message<T> beforeConsume(Message<T> message);
    void onAcknowledge(MessageId messageId, Throwable cause);
    void onAcknowledgeCumulative(MessageId messageId, Throwable cause);
    ```
    Add Producer interceptors.
    ```java
    Message<T> beforeSend(Message<T> message);
    void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable cause);
    ```
    ### Result
    Users can using interceptors in multiple scenarios, such as for applications to add
    custom logging or processing.
    
    Master Issue: #2476
---
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   2 +-
 .../pulsar/broker/service/PersistentTopicTest.java |   7 +-
 .../pulsar/broker/service/ReplicatorTest.java      |   3 +-
 .../broker/service/v1/V1_ReplicatorTest.java       |   3 +-
 .../apache/pulsar/client/api/InterceptorsTest.java | 359 +++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     |   6 +
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   8 +
 .../pulsar/client/api/ConsumerInterceptor.java     |  99 ++++++
 .../apache/pulsar/client/api/ProducerBuilder.java  |   8 +
 .../pulsar/client/api/ProducerInterceptor.java     |  92 ++++++
 .../apache/pulsar/client/impl/ConsumerBase.java    |  25 +-
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |  20 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  40 ++-
 .../pulsar/client/impl/ConsumerInterceptors.java   | 132 ++++++++
 .../org/apache/pulsar/client/impl/MessageImpl.java |   4 +-
 .../client/impl/MultiTopicsConsumerImpl.java       |  12 +-
 .../client/impl/PartitionedProducerImpl.java       |   6 +-
 .../impl/PatternMultiTopicsConsumerImpl.java       |   4 +-
 .../apache/pulsar/client/impl/ProducerBase.java    |  18 +-
 .../pulsar/client/impl/ProducerBuilderImpl.java    |  18 +-
 .../apache/pulsar/client/impl/ProducerImpl.java    |  72 +++--
 .../pulsar/client/impl/ProducerInterceptors.java   | 110 +++++++
 .../pulsar/client/impl/PulsarClientImpl.java       |  45 +--
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   2 +-
 .../pulsar/client/impl/TopicMessageImpl.java       |   4 +
 .../pulsar/functions/instance/ContextImplTest.java |   2 +-
 .../MultiConsumersOneOutputTopicProducersTest.java |  27 +-
 27 files changed, 1033 insertions(+), 95 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index ae1a4db..99aec93 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -111,7 +111,7 @@ public class RawReaderImpl implements RawReader {
                 consumerFuture,
                 SubscriptionMode.Durable,
                 MessageId.earliest,
-                Schema.BYTES);
+                Schema.BYTES, null);
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index f8ac40a..8c175d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -24,6 +24,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.matches;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
@@ -36,7 +37,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
@@ -80,7 +80,6 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.Topic.PublishContext;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -1224,7 +1223,7 @@ public class PersistentTopicTest {
         verify(clientImpl)
             .createProducerAsync(
                 any(ProducerConfigurationData.class),
-                any(Schema.class)
+                any(Schema.class), eq(null)
             );
 
         replicator.disconnect(false);
@@ -1235,7 +1234,7 @@ public class PersistentTopicTest {
         verify(clientImpl, Mockito.times(2))
             .createProducerAsync(
                 any(ProducerConfigurationData.class),
-                any(Schema.class)
+                any(Schema.class), any(null)
             );
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index de49925..528ff64 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
@@ -242,7 +243,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
         Thread.sleep(3000);
 
         Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
-                Mockito.any(Schema.class));
+                Mockito.any(Schema.class), eq(null));
 
         client1.shutdown();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
index 0d6d303..92bb641 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service.v1;
 
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
@@ -241,7 +242,7 @@ public class V1_ReplicatorTest extends V1_ReplicatorTestBase {
         Thread.sleep(3000);
 
         Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
-                Mockito.any(Schema.class));
+                Mockito.any(Schema.class), eq(null));
 
         client1.shutdown();
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
new file mode 100644
index 0000000..d833846
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class InterceptorsTest extends ProducerConsumerBase {
+
+    private static final Logger log = LoggerFactory.getLogger(InterceptorsTest.class);
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testProducerInterceptor() throws PulsarClientException {
+        ProducerInterceptor<String> interceptor1 = new ProducerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                log.info("Before send message: {}", new String(msg.getData()));
+                java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties = msg.getMessageBuilder().getPropertiesList();
+                for (int i = 0; i < properties.size(); i++) {
+                    if ("key".equals(properties.get(i).getKey())) {
+                        msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("after").build());
+                    }
+                }
+                return msg;
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable cause) {
+                message.getProperties();
+                Assert.assertEquals("complete", message.getProperty("key"));
+                log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause);
+            }
+        };
+
+        ProducerInterceptor<String> interceptor2 = new ProducerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeSend(Producer<String> producer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                log.info("Before send message: {}", new String(msg.getData()));
+                java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties = msg.getMessageBuilder().getPropertiesList();
+                for (int i = 0; i < properties.size(); i++) {
+                    if ("key".equals(properties.get(i).getKey())) {
+                        msg.getMessageBuilder().setProperties(i, PulsarApi.KeyValue.newBuilder().setKey("key").setValue("complete").build());
+                    }
+                }
+                return msg;
+            }
+
+            @Override
+            public void onSendAcknowledgement(Producer<String> producer, Message<String> message, MessageId msgId, Throwable cause) {
+                message.getProperties();
+                Assert.assertEquals("complete", message.getProperty("key"));
+                log.info("Send acknowledgement message: {}, msgId: {}", new String(message.getData()), msgId, cause);
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .intercept(interceptor1, interceptor2)
+                .create();
+
+        MessageId messageId = producer.newMessage().property("key", "before").value("Hello Pulsar!").send();
+        log.info("Send result messageId: {}", messageId);
+        producer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithSingleTopicSubscribe() throws PulsarClientException {
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+            }
+        };
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+
+        Message<String> received = consumer.receive();
+        MessageImpl<String> msg = (MessageImpl<String>) received;
+        boolean haveKey = false;
+        for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+            if ("beforeConsumer".equals(keyValue.getKey())) {
+                haveKey = true;
+            }
+        }
+        Assert.assertTrue(haveKey);
+        consumer.acknowledge(received);
+        producer.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithMultiTopicSubscribe() throws PulsarClientException {
+
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic1")
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic", "persistent://my-property/my-ns/my-topic1")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+        producer1.newMessage().value("Hello Pulsar!").send();
+
+        int keyCount = 0;
+        for (int i = 0; i < 2; i++) {
+            Message<String> received = consumer.receive();
+            MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
+            for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+                if ("beforeConsumer".equals(keyValue.getKey())) {
+                    keyCount++;
+                }
+            }
+            consumer.acknowledge(received);
+        }
+        Assert.assertEquals(2, keyCount);
+        producer.close();
+        producer1.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException {
+
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+            }
+        };
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic1")
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern("persistent://my-property/my-ns/my-.*")
+                .subscriptionType(SubscriptionType.Shared)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        producer.newMessage().value("Hello Pulsar!").send();
+        producer1.newMessage().value("Hello Pulsar!").send();
+
+        int keyCount = 0;
+        for (int i = 0; i < 2; i++) {
+            Message<String> received = consumer.receive();
+            MessageImpl<String> msg = (MessageImpl<String>) ((TopicMessageImpl<String>) received).getMessage();
+            for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+                if ("beforeConsumer".equals(keyValue.getKey())) {
+                    keyCount++;
+                }
+            }
+            consumer.acknowledge(received);
+        }
+        Assert.assertEquals(2, keyCount);
+        producer.close();
+        producer1.close();
+        consumer.close();
+    }
+
+    @Test
+    public void testConsumerInterceptorForAcknowledgeCumulative() throws PulsarClientException {
+
+        List<MessageId> ackHolder = new ArrayList<>();
+
+        ConsumerInterceptor<String> interceptor = new ConsumerInterceptor<String>() {
+            @Override
+            public void close() {
+
+            }
+
+            @Override
+            public Message<String> beforeConsume(Consumer<String> consumer, Message<String> message) {
+                MessageImpl<String> msg = (MessageImpl<String>) message;
+                msg.getMessageBuilder().addProperties(PulsarApi.KeyValue.newBuilder().setKey("beforeConsumer").setValue("1").build());
+                return msg;
+            }
+
+            @Override
+            public void onAcknowledge(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                log.info("onAcknowledge messageId: {}", messageId, cause);
+            }
+
+            @Override
+            public void onAcknowledgeCumulative(Consumer<String> consumer, MessageId messageId, Throwable cause) {
+                long acknowledged = ackHolder.stream().filter(m -> (m.compareTo(messageId) <= 0)).count();
+                Assert.assertEquals(acknowledged, 100);
+                ackHolder.clear();
+                log.info("onAcknowledgeCumulative messageIds: {}", messageId, cause);
+            }
+        };
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionType(SubscriptionType.Failover)
+                .intercept(interceptor)
+                .subscriptionName("my-subscription")
+                .subscribe();
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic("persistent://my-property/my-ns/my-topic")
+                .create();
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().value("Hello Pulsar!").send();
+        }
+
+        int keyCount = 0;
+        for (int i = 0; i < 100; i++) {
+            Message<String> received = consumer.receive();
+            MessageImpl<String> msg = (MessageImpl<String>) received;
+            for (PulsarApi.KeyValue keyValue : msg.getMessageBuilder().getPropertiesList()) {
+                if ("beforeConsumer".equals(keyValue.getKey())) {
+                    keyCount++;
+                }
+            }
+            ackHolder.add(received.getMessageId());
+            if (i == 99) {
+                consumer.acknowledgeCumulative(received);
+            }
+        }
+        Assert.assertEquals(100, keyCount);
+        producer.close();
+        consumer.close();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 69d885f..f5a9266 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -288,4 +288,10 @@ public interface Consumer<T> extends Closeable {
      * @return Whether the consumer is connected to the broker
      */
     boolean isConnected();
+
+    /**
+     * Get the name of consumer.
+     * @return consumer name.
+     */
+    String getConsumerName();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 8657859..da1cbb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -330,4 +330,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
      * Set subscriptionInitialPosition for the consumer
     */
     ConsumerBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition subscriptionInitialPosition);
+
+    /**
+     * Intercept {@link Consumer}.
+     *
+     * @param interceptors the list of interceptors to intercept the consumer created by this builder.
+     * @return consumer builder.
+     */
+    ConsumerBuilder<T> intercept(ConsumerInterceptor<T> ...interceptors);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
new file mode 100644
index 0000000..1134d8a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.List;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate)
+ * messages received by the consumer.
+ * <p>
+ * A primary use case is to hook into consumer applications for custom
+ * monitoring, logging, etc.
+ * <p>
+ * Exceptions thrown by interceptor methods will be caught, logged, but
+ * not propagated further.
+ */
+public interface ConsumerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called just before the message is returned by
+     * {@link Consumer#receive()}, {@link MessageListener#received(Consumer,
+     * Message)} or the {@link java.util.concurrent.CompletableFuture} returned by
+     * {@link Consumer#receiveAsync()} completes.
+     * <p>
+     * This method is allowed to modify message, in which case the new message
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller, logged,
+     * but not propagated to client.
+     * <p>
+     * Since the consumer may run multiple interceptors, a particular
+     * interceptor's
+     * <tt>beforeConsume</tt> callback will be called in the order specified by
+     * {@link ConsumerBuilder#intercept(ConsumerInterceptor[])}. The first
+     * interceptor in the list gets the consumed message, the following
+     * interceptor will be passed
+     * the message returned by the previous interceptor, and so on. Since
+     * interceptors are allowed to modify message, interceptors may potentially
+     * get the messages already modified by other interceptors. However building a
+     * pipeline of mutable
+     * interceptors that depend on the output of the previous interceptor is
+     * discouraged, because of potential side-effects caused by interceptors
+     * potentially failing to modify the message and throwing an exception.
+     * if one of interceptors in the list throws an exception from
+     * <tt>beforeConsume</tt>, the exception is caught, logged,
+     * and the next interceptor is called with the message returned by the last
+     * successful interceptor in the list, or otherwise the original consumed
+     * message.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param message the message to be consumed by the client.
+     * @return message that is either modified by the interceptor or same message
+     *         passed into the method.
+     */
+    Message<T> beforeConsume(Consumer<T> consumer, Message<T> message);
+
+    /**
+     * This is called consumer sends the acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param messageId message to ack, null if acknowledge fail.
+     * @param exception the exception on acknowledge.
+     */
+    void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception);
+
+    /**
+     * This is called consumer send the cumulative acknowledgment to the broker.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param consumer the consumer which contains the interceptor
+     * @param messageId message to ack, null if acknowledge fail.
+     * @param exception the exception on acknowledge.
+     */
+    void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 8256b4a..b3aa720 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -318,4 +318,12 @@ public interface ProducerBuilder<T> extends Cloneable {
      * @return
      */
     ProducerBuilder<T> properties(Map<String, String> properties);
+
+    /**
+     * Intercept {@link Producer}.
+     *
+     * @param interceptors the list of interceptors to intercept the producer created by this builder.
+     * @return producer builder.
+     */
+    ProducerBuilder<T> intercept(ProducerInterceptor<T> ... interceptors);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
new file mode 100644
index 0000000..b6a0d77
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the
+ * messages received by the producer before they are published to the Pulsar
+ * brokers.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
+ * not propagated further.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. Interceptor
+ * implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<T> extends AutoCloseable {
+
+    /**
+     * Close the interceptor.
+     */
+    void close();
+
+    /**
+     * This is called from {@link Producer#send(Object)} and {@link
+     * Producer#sendAsync(Object)} methods, before
+     * send the message to the brokers. This method is allowed to modify the
+     * record, in which case, the new record
+     * will be returned.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller and
+     * logged, but not propagated further.
+     * <p>
+     * Since the producer may run multiple interceptors, a particular
+     * interceptor's {@link #beforeSend(Producer, Message)} callback will be called in the
+     * order specified by
+     * {@link ProducerBuilder#intercept(ProducerInterceptor[])}.
+     * <p>
+     * The first interceptor in the list gets the message passed from the client,
+     * the following interceptor will be passed the message returned by the
+     * previous interceptor, and so on. Since interceptors are allowed to modify
+     * messages, interceptors may potentially get the message already modified by
+     * other interceptors. However, building a pipeline of mutable interceptors
+     * that depend on the output of the previous interceptor is discouraged,
+     * because of potential side-effects caused by interceptors potentially
+     * failing to modify the message and throwing an exception. If one of the
+     * interceptors in the list throws an exception from
+     * {@link#beforeSend(Message)}, the exception is caught, logged, and the next
+     * interceptor is called with the message returned by the last successful
+     * interceptor in the list, or otherwise the client.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message message to send
+     * @return the intercepted message
+     */
+    Message<T> beforeSend(Producer<T> producer, Message<T> message);
+
+    /**
+     * This method is called when the message sent to the broker has been
+     * acknowledged, or when sending the message fails.
+     * This method is generally called just before the user callback is
+     * called, and in additional cases when an exception on the producer side.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the
+     * implementation should be reasonably fast. Otherwise, sending of messages
+     * from other threads could be delayed.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message the message that application sends
+     * @param msgId the message id that assigned by the broker; null if send failed.
+     * @param exception the exception on sending messages, null indicates send has succeed.
+     */
+    void onSendAcknowledgement(Producer<T> producer, Message<T> message, MessageId msgId, Throwable exception);
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a6fda6f..d8a0de3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -60,10 +60,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
     protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
     protected int maxReceiverQueueSize;
     protected Schema<T> schema;
+    protected final ConsumerInterceptors<T> interceptors;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                            int receiverQueueSize, ExecutorService listenerExecutor,
-                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
+                           CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
         super(client, topic);
         this.maxReceiverQueueSize = receiverQueueSize;
         this.subscription = conf.getSubscriptionName();
@@ -81,6 +82,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.listenerExecutor = listenerExecutor;
         this.pendingReceives = Queues.newConcurrentLinkedQueue();
         this.schema = schema;
+        this.interceptors = interceptors;
     }
 
     @Override
@@ -335,6 +337,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         return subscription;
     }
 
+    @Override
     public String getConsumerName() {
         return this.consumerName;
     }
@@ -360,4 +363,24 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
         this.maxReceiverQueueSize = newSize;
     }
 
+    protected Message<T> beforeConsume(Message<T> message) {
+        if (interceptors != null) {
+            return interceptors.beforeConsume(this, message);
+        } else {
+            return message;
+        }
+    }
+
+    protected void onAcknowledge(MessageId messageId, Throwable exception) {
+        if (interceptors != null) {
+            interceptors.onAcknowledge(this, messageId, exception);
+        }
+    }
+
+    protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
+        if (interceptors != null) {
+            interceptors.onAcknowledgeCumulative(this, messageId, exception);
+        }
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index f0067f7..2095bab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -32,6 +34,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -52,6 +55,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
     private final PulsarClientImpl client;
     private ConsumerConfigurationData<T> conf;
     private final Schema<T> schema;
+    private List<ConsumerInterceptor<T>> interceptorList;
 
     private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
 
@@ -104,8 +108,9 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("Subscription name must be set on the consumer builder"));
         }
-
-        return client.subscribeAsync(conf, schema);
+        return interceptorList == null || interceptorList.size() == 0 ?
+                client.subscribeAsync(conf, schema, null) :
+                client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
     }
 
     @Override
@@ -242,7 +247,16 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
 		return this;
 	}
 
-	public ConsumerConfigurationData<T> getConf() {
+    @Override
+    public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors) {
+        if (interceptorList == null) {
+            interceptorList = new ArrayList<>();
+        }
+        interceptorList.addAll(Arrays.asList(interceptors));
+        return this;
+    }
+
+    public ConsumerConfigurationData<T> getConf() {
 	    return conf;
 	}
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index fe37b69..a0a2319 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -143,14 +143,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
-        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema);
+            ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors interceptors) {
+        this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null, schema, interceptors);
     }
 
     ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                  ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
-                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema) {
-        super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema);
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors interceptors) {
+        super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
         this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -263,8 +263,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         Message<T> message;
         try {
             message = incomingMessages.take();
-            messageProcessed(message);
-            return message;
+            Message<T> interceptMsg = beforeConsume(message);
+            messageProcessed(interceptMsg);
+            return interceptMsg;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             stats.incrementNumReceiveFailed();
@@ -293,8 +294,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         if (message == null && conf.getReceiverQueueSize() == 0) {
             sendFlowPermitsToBroker(cnx(), 1);
         } else if (message != null) {
-            messageProcessed(message);
-            result.complete(message);
+            Message<T> interceptMsg = beforeConsume(message);
+            messageProcessed(interceptMsg);
+            result.complete(interceptMsg);
         }
 
         return result;
@@ -352,10 +354,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         Message<T> message;
         try {
             message = incomingMessages.poll(timeout, unit);
-            if (message != null) {
-                messageProcessed(message);
+            Message<T> interceptMsg = beforeConsume(message);
+            if (interceptMsg != null) {
+                messageProcessed(interceptMsg);
             }
-            return message;
+            return interceptMsg;
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
 
@@ -410,7 +413,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         checkArgument(messageId instanceof MessageIdImpl);
         if (getState() != State.Ready && getState() != State.Connecting) {
             stats.incrementNumAcksFailed();
-            return FutureUtil.failedFuture(new PulsarClientException("Consumer not ready. State: " + getState()));
+            PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
+            if (AckType.Individual.equals(ackType)) {
+                onAcknowledge(messageId, exception);
+            } else if (AckType.Cumulative.equals(ackType)) {
+                onAcknowledgeCumulative(messageId, exception);
+            }
+            return FutureUtil.failedFuture(exception);
         }
 
         if (messageId instanceof BatchMessageIdImpl) {
@@ -444,7 +453,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 unAckedMessageTracker.remove(msgId);
                 stats.incrementNumAcksSent(1);
             }
+            onAcknowledge(messageId, null);
         } else if (ackType == AckType.Cumulative) {
+            onAcknowledgeCumulative(messageId, null);
             stats.incrementNumAcksSent(unAckedMessageTracker.removeMessagesTill(msgId));
         }
 
@@ -837,9 +848,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                         receivedFuture.complete(message);
                     } else {
                         // increase permits for available message-queue
-                        messageProcessed(message);
+                        Message<T> interceptMsg = beforeConsume(message);
+                        messageProcessed(interceptMsg);
                         // return message to receivedCallback
-                        listenerExecutor.execute(() -> receivedFuture.complete(message));
+                        listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg));
                     }
                 }
             } else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
new file mode 100644
index 0000000..a0d30fc
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerInterceptor;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A container that hold the list {@link org.apache.pulsar.client.api.ConsumerInterceptor} and wraps calls to the chain
+ * of custom interceptors.
+ */
+public class ConsumerInterceptors<T> implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
+
+    private final List<ConsumerInterceptor<T>> interceptors;
+
+    public ConsumerInterceptors(List<ConsumerInterceptor<T>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called just before the message is returned by {@link Consumer#receive()},
+     * {@link MessageListener#received(Consumer, Message)} or the {@link java.util.concurrent.CompletableFuture}
+     * returned by {@link Consumer#receiveAsync()} completes.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#beforeConsume(Consumer, Message)} for each interceptor. Messages returned
+     * from each interceptor get passed to beforeConsume() of the next interceptor in the chain of interceptors.
+     * <p>
+     * This method does not throw exceptions. If any of the interceptors in the chain throws an exception, it gets
+     * caught and logged, and next interceptor in int the chain is called with 'messages' returned by the previous
+     * successful interceptor beforeConsume call.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param message message to be consume by the client.
+     * @return messages that are either modified by interceptors or same as messages passed to this method.
+     */
+    public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
+        Message<T> interceptorMessage = message;
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptorMessage = interceptors.get(i).beforeConsume(consumer, interceptorMessage);
+            } catch (Exception e) {
+                if (consumer != null) {
+                    log.warn("Error executing interceptor beforeConsume callback topic: {} consumerName: {}", consumer.getTopic(), consumer.getConsumerName(), e);
+                } else {
+                    log.warn("Error executing interceptor beforeConsume callback", e);
+                }
+            }
+        }
+        return interceptorMessage;
+    }
+
+    /**
+     * This is called when acknowledge request return from the broker.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onAcknowledge(Consumer, MessageId, Throwable)} method for each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param messageId message to acknowledge.
+     * @param exception exception returned by broker.
+     */
+    public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).onAcknowledge(consumer, messageId, exception);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onAcknowledge callback ", e);
+            }
+        }
+    }
+
+    /**
+     * This is called when acknowledge cumulative request return from the broker.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onAcknowledgeCumulative(Consumer, MessageId, Throwable)} (Message, Throwable)} method for each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of interceptors in the chain are logged, but not propagated.
+     *
+     * @param consumer the consumer which contains the interceptors
+     * @param messageId messages to acknowledge.
+     * @param exception exception returned by broker.
+     */
+    public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).onAcknowledgeCumulative(consumer, messageId, exception);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onAcknowledgeCumulative callback ", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).close();
+            } catch (Exception e) {
+                log.error("Fail to close consumer interceptor ", e);
+            }
+        }
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 96b68c1..97e2247 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -241,7 +241,7 @@ public class MessageImpl<T> implements Message<T> {
         return null;
     }
 
-    ByteBuf getDataBuffer() {
+    public ByteBuf getDataBuffer() {
         return payload;
     }
 
@@ -274,7 +274,7 @@ public class MessageImpl<T> implements Message<T> {
         return properties.get(name);
     }
 
-    MessageMetadata.Builder getMessageBuilder() {
+    public MessageMetadata.Builder getMessageBuilder() {
         return msgMetadataBuilder;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index f1cb9cf..75fdac6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -85,9 +85,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     private final ConsumerConfigurationData<T> internalConfig;
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf, ExecutorService listenerExecutor,
-                            CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema) {
+                            CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
-                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema);
+                Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture, schema, interceptors);
 
         checkArgument(conf.getReceiverQueueSize() > 0,
             "Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -632,7 +632,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                                                                            ExecutorService listenerExecutor,
                                                                            CompletableFuture<Consumer<T>> subscribeFuture,
                                                                            int numPartitions,
-                                                                           Schema<T> schema) {
+                                                                           Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer");
 
         // get topic name, then remove it from conf, so constructor will create a consumer with no topic.
@@ -641,7 +641,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         cloneConf.getTopicNames().remove(topicName);
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
-        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema);
+        MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, cloneConf, listenerExecutor, future, schema, interceptors);
 
         future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
             .thenRun(()-> subscribeFuture.complete(consumer))
@@ -695,7 +695,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
                         CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
                         ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, partitionName, configurationData,
-                            client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema);
+                            client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture, schema, interceptors);
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         return subFuture;
                     })
@@ -706,7 +706,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
             CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
             ConsumerImpl<T> newConsumer = new ConsumerImpl<>(client, topicName, internalConfig,
-                client.externalExecutorProvider().getExecutor(), 0, subFuture, schema);
+                client.externalExecutorProvider().getExecutor(), 0, subFuture, schema, interceptors);
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
             futureList = Collections.singletonList(subFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 3e159e8..12ecf2b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -53,8 +53,8 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
     private final TopicMetadata topicMetadata;
 
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
-            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema) {
-        super(client, topic, conf, producerCreatedFuture, schema);
+            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors<T> interceptors) {
+        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
         this.producers = Lists.newArrayListWithCapacity(numPartitions);
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
@@ -111,7 +111,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
         for (int partitionIndex = 0; partitionIndex < topicMetadata.numPartitions(); partitionIndex++) {
             String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString();
             ProducerImpl<T> producer = new ProducerImpl<>(client, partitionName, conf, new CompletableFuture<>(),
-                    partitionIndex, schema);
+                    partitionIndex, schema, interceptors);
             producers.add(producer);
             producer.producerCreatedFuture().handle((prod, createException) -> {
                 if (createException != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index d0b0c60..3f6dfec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -51,8 +51,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
                                           ConsumerConfigurationData<T> conf,
                                           ExecutorService listenerExecutor,
                                           CompletableFuture<Consumer<T>> subscribeFuture,
-                                          Schema<T> schema) {
-        super(client, conf, listenerExecutor, subscribeFuture, schema);
+                                          Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+        super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors);
         this.topicsPattern = topicsPattern;
 
         if (this.namespaceName == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index eb02b6b..39e632c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -36,13 +36,15 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
     protected final CompletableFuture<Producer<T>> producerCreatedFuture;
     protected final ProducerConfigurationData conf;
     protected final Schema<T> schema;
+    protected final ProducerInterceptors<T> interceptors;
 
     protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
-            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema) {
+            CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors<T> interceptors) {
         super(client, topic);
         this.producerCreatedFuture = producerCreatedFuture;
         this.conf = conf;
         this.schema = schema;
+        this.interceptors = interceptors;
     }
 
     @Override
@@ -148,6 +150,20 @@ public abstract class ProducerBase<T> extends HandlerState implements Producer<T
         return producerCreatedFuture;
     }
 
+    protected Message<T> beforeSend(Message<T> message) {
+        if (interceptors != null) {
+            return interceptors.beforeSend(this, message);
+        } else {
+            return message;
+        }
+    }
+
+    protected void onSendAcknowledgement(Message<T> message, MessageId msgId, Throwable exception) {
+        if (interceptors != null) {
+            interceptors.onSendAcknowledgement(this, message, msgId, exception);
+        }
+    }
+
     @Override
     public String toString() {
         return "ProducerBase{" + "topic='" + topic + '\'' + '}';
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index ff391be..834e0c0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.client.impl;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -33,6 +36,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.ProducerInterceptor;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -46,6 +50,7 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
     private final PulsarClientImpl client;
     private ProducerConfigurationData conf;
     private Schema<T> schema;
+    private List<ProducerInterceptor<T>> interceptorList;
 
     @VisibleForTesting
     public ProducerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
@@ -97,7 +102,9 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
         }
 
-        return client.createProducerAsync(conf, schema);
+        return interceptorList == null || interceptorList.size() == 0 ?
+                client.createProducerAsync(conf, schema, null) :
+                client.createProducerAsync(conf, schema, new ProducerInterceptors<>(interceptorList));
     }
 
     @Override
@@ -226,4 +233,13 @@ public class ProducerBuilderImpl<T> implements ProducerBuilder<T> {
         conf.getProperties().putAll(properties);
         return this;
     }
+
+    @Override
+    public ProducerBuilder<T> intercept(ProducerInterceptor<T>... interceptors) {
+        if (interceptorList == null) {
+            interceptorList = new ArrayList<>();
+        }
+        interceptorList.addAll(Arrays.asList(interceptors));
+        return this;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c3428e2..9401e7c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -121,8 +121,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             .newUpdater(ProducerImpl.class, "msgIdGenerator");
 
     public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
-                        CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema) {
-        super(client, topic, conf, producerCreatedFuture, schema);
+                        CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema,
+                        ProducerInterceptors<T> interceptors) {
+        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
         this.producerId = client.newProducerId();
         this.producerName = conf.getProducerName();
         this.partitionIndex = partitionIndex;
@@ -205,9 +206,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
     @Override
     CompletableFuture<MessageId> internalSendAsync(Message<T> message) {
+
         CompletableFuture<MessageId> future = new CompletableFuture<>();
 
-        sendAsync(message, new SendCallback() {
+        MessageImpl<T> interceptorMessage = (MessageImpl<T>) beforeSend(message);
+        //Retain the buffer used by interceptors callback to get message. Buffer will release after complete interceptors.
+        interceptorMessage.getDataBuffer().retain();
+        if (interceptors != null) {
+            interceptorMessage.getProperties();
+        }
+        sendAsync(interceptorMessage, new SendCallback() {
             SendCallback nextCallback = null;
             MessageImpl<?> nextMsg = null;
             long createdAt = System.nanoTime();
@@ -229,25 +237,40 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
 
             @Override
             public void sendComplete(Exception e) {
-                if (e != null) {
-                    stats.incrementSendFailed();
-                    future.completeExceptionally(e);
-                } else {
-                    future.complete(message.getMessageId());
-                    stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
-                }
-                while (nextCallback != null) {
-                    SendCallback sendCallback = nextCallback;
-                    MessageImpl<?> msg = nextMsg;
+                try {
                     if (e != null) {
                         stats.incrementSendFailed();
-                        sendCallback.getFuture().completeExceptionally(e);
+                        onSendAcknowledgement(interceptorMessage, null, e);
+                        future.completeExceptionally(e);
                     } else {
-                        sendCallback.getFuture().complete(msg.getMessageId());
+                        onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
+                        future.complete(interceptorMessage.getMessageId());
                         stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
                     }
-                    nextMsg = nextCallback.getNextMessage();
-                    nextCallback = nextCallback.getNextSendCallback();
+                } finally {
+                    interceptorMessage.getDataBuffer().release();
+                }
+
+                while (nextCallback != null) {
+                    SendCallback sendCallback = nextCallback;
+                    MessageImpl<?> msg = nextMsg;
+                    //Retain the buffer used by interceptors callback to get message. Buffer will release after complete interceptors.
+                    try {
+                        msg.getDataBuffer().retain();
+                        if (e != null) {
+                            stats.incrementSendFailed();
+                            onSendAcknowledgement((Message<T>) msg, null, e);
+                            sendCallback.getFuture().completeExceptionally(e);
+                        } else {
+                            onSendAcknowledgement((Message<T>) msg, msg.getMessageId(), null);
+                            sendCallback.getFuture().complete(msg.getMessageId());
+                            stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
+                        }
+                        nextMsg = nextCallback.getNextMessage();
+                        nextCallback = nextCallback.getNextSendCallback();
+                    } finally {
+                        msg.getDataBuffer().release();
+                    }
                 }
             }
 
@@ -292,14 +315,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
             String compressedStr = (!isBatchMessagingEnabled() && conf.getCompressionType() != CompressionType.NONE)
                     ? "Compressed"
                     : "";
-            callback.sendComplete(new PulsarClientException.InvalidMessageException(
-                    format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
-                            PulsarDecoder.MaxMessageSize)));
+            PulsarClientException.InvalidMessageException invalidMessageException =
+                    new PulsarClientException.InvalidMessageException(
+                            format("%s Message payload size %d cannot exceed %d bytes", compressedStr, compressedSize,
+                                    PulsarDecoder.MaxMessageSize));
+            callback.sendComplete(invalidMessageException);
             return;
         }
 
         if (!msg.isReplicated() && msgMetadataBuilder.hasProducerName()) {
-            callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message"));
+            PulsarClientException.InvalidMessageException invalidMessageException =
+                    new PulsarClientException.InvalidMessageException("Cannot re-use the same message");
+            callback.sendComplete(invalidMessageException);
             compressedPayload.release();
             return;
         }
@@ -463,8 +490,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 semaphore.acquire();
             } else {
                 if (!semaphore.tryAcquire()) {
-                    callback.sendComplete(
-                            new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
+                    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full"));
                     return false;
                 }
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
new file mode 100644
index 0000000..59d1ad3
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerInterceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A container that holds the list{@link org.apache.pulsar.client.api.ProducerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ProducerInterceptors<T> implements Closeable {
+
+    private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
+
+    private final List<ProducerInterceptor<T>> interceptors;
+
+    public ProducerInterceptors(List<ProducerInterceptor<T>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called when client sends message to pulsar broker, before key and value gets serialized.
+     * The method calls {@link ProducerInterceptor#beforeSend(Producer,Message)} method. Message returned from
+     * first interceptor's beforeSend() is passed to the second interceptor beforeSend(), and so on in the
+     * interceptor chain. The message returned from the last interceptor is returned from this method.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any interceptor methods are caught and ignored.
+     * If a interceptor in the middle of the chain, that normally modifies the message, throws an exception,
+     * the next interceptor in the chain will be called with a message returned by the previous interceptor that did
+     * not throw an exception.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message the message from client
+     * @return the message to send to topic/partition
+     */
+    public Message<T> beforeSend(Producer<T> producer, Message<T> message) {
+        Message<T> interceptorMessage = message;
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptorMessage = interceptors.get(i).beforeSend(producer, interceptorMessage);
+            } catch (Exception e) {
+                if (message != null && producer != null) {
+                    log.warn("Error executing interceptor beforeSend callback for messageId: {}, topicName:{} ", message.getMessageId(), producer.getTopic(), e);
+                } else {
+                    log.warn("Error Error executing interceptor beforeSend callback ", e);
+                }
+            }
+        }
+        return interceptorMessage;
+    }
+
+    /**
+     * This method is called when the message send to the broker has been acknowledged, or when sending the record fails
+     * before it gets send to the broker.
+     * This method calls {@link ProducerInterceptor#onSendAcknowledgement(Producer, Message, MessageId, Throwable)} method for
+     * each interceptor.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message The message returned from the last interceptor is returned from {@link ProducerInterceptor#beforeSend(Producer, Message)}
+     * @param msgId The message id that broker returned. Null if has error occurred.
+     * @param exception The exception thrown during processing of this message. Null if no error occurred.
+     */
+    public void onSendAcknowledgement(Producer<T> producer, Message<T> message, MessageId msgId, Throwable exception) {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).onSendAcknowledgement(producer, message, msgId, exception);
+            } catch (Exception e) {
+                log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (int i = 0; i < interceptors.size(); i++) {
+            try {
+                interceptors.get(i).close();
+            } catch (Exception e) {
+                log.error("Fail to close producer interceptor ", e);
+            }
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 4013068..b87b9bd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -238,10 +238,15 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
-        return createProducerAsync(conf, Schema.BYTES);
+        return createProducerAsync(conf, Schema.BYTES, null);
     }
 
-    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema) {
+    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf,  Schema<T> schema) {
+        return createProducerAsync(conf, schema, null);
+    }
+
+    public <T> CompletableFuture<Producer<T>> createProducerAsync(ProducerConfigurationData conf, Schema<T> schema,
+          ProducerInterceptors<T> interceptors) {
         if (conf == null) {
             return FutureUtil.failedFuture(
                     new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
@@ -273,9 +278,9 @@ public class PulsarClientImpl implements PulsarClient {
             ProducerBase<T> producer;
             if (metadata.partitions > 1) {
                 producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions,
-                        producerCreatedFuture, schema);
+                        producerCreatedFuture, schema, interceptors);
             } else {
-                producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema);
+                producer = new ProducerImpl<>(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1, schema, interceptors);
             }
 
             synchronized (producers) {
@@ -336,10 +341,10 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public CompletableFuture<Consumer<byte[]>> subscribeAsync(ConsumerConfigurationData<byte[]> conf) {
-        return subscribeAsync(conf, Schema.BYTES);
+        return subscribeAsync(conf, Schema.BYTES, null);
     }
 
-    public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         if (state.get() != State.Open) {
             return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
         }
@@ -377,15 +382,15 @@ public class PulsarClientImpl implements PulsarClient {
                 return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern"));
             }
-            return patternTopicSubscribeAsync(conf, schema);
+            return patternTopicSubscribeAsync(conf, schema, interceptors);
         } else if (conf.getTopicNames().size() == 1) {
-            return singleTopicSubscribeAsync(conf, schema);
+            return singleTopicSubscribeAsync(conf, schema, interceptors);
         } else {
-            return multiTopicSubscribeAsync(conf, schema);
+            return multiTopicSubscribeAsync(conf, schema, interceptors);
         }
     }
 
-    private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         if (schema instanceof AutoSchema) {
             AutoSchema autoSchema = (AutoSchema) schema;
             return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
@@ -395,20 +400,20 @@ public class PulsarClientImpl implements PulsarClient {
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
                             autoSchema.setSchema(genericSchema);
-                            return doSingleTopicSubscribeAsync(conf, schema);
+                            return doSingleTopicSubscribeAsync(conf, schema, interceptors);
                         } else {
                             return FutureUtil.failedFuture(
                                 new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas"));
                         }
                     });
         } else {
-            return doSingleTopicSubscribeAsync(conf, schema);
+            return doSingleTopicSubscribeAsync(conf, schema, interceptors);
         }
     }
 
 
 
-    private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         String topic = conf.getSingleTopic();
@@ -423,10 +428,10 @@ public class PulsarClientImpl implements PulsarClient {
             ExecutorService listenerThread = externalExecutorProvider.getExecutor();
             if (metadata.partitions > 1) {
                 consumer = MultiTopicsConsumerImpl.createPartitionedConsumer(PulsarClientImpl.this, conf,
-                    listenerThread, consumerSubscribedFuture, metadata.partitions, schema);
+                    listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
             } else {
                 consumer = new ConsumerImpl<>(PulsarClientImpl.this, topic, conf, listenerThread, -1,
-                        consumerSubscribedFuture, schema);
+                        consumerSubscribedFuture, schema, interceptors);
             }
 
             synchronized (consumers) {
@@ -441,11 +446,11 @@ public class PulsarClientImpl implements PulsarClient {
         return consumerSubscribedFuture;
     }
 
-    private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    private <T> CompletableFuture<Consumer<T>> multiTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
-                externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema);
+                externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors);
 
         synchronized (consumers) {
             consumers.put(consumer, Boolean.TRUE);
@@ -455,10 +460,10 @@ public class PulsarClientImpl implements PulsarClient {
     }
 
     public CompletableFuture<Consumer<byte[]>> patternTopicSubscribeAsync(ConsumerConfigurationData<byte[]> conf) {
-        return patternTopicSubscribeAsync(conf, Schema.BYTES);
+        return patternTopicSubscribeAsync(conf, Schema.BYTES, null);
     }
 
-    private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) {
+    private <T> CompletableFuture<Consumer<T>> patternTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors interceptors) {
         String regex = conf.getTopicsPattern().pattern();
         TopicName destination = TopicName.get(regex);
         NamespaceName namespaceName = destination.getNamespaceObject();
@@ -479,7 +484,7 @@ public class PulsarClientImpl implements PulsarClient {
                     conf,
                     externalExecutorProvider.getExecutor(),
                     consumerSubscribedFuture,
-                    schema);
+                    schema, interceptors);
 
                 synchronized (consumers) {
                     consumers.put(consumer, Boolean.TRUE);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index aafd125..388e5c4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -85,7 +85,7 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema);
+                partitionIdx, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index eae02b0..230a022 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -127,4 +127,8 @@ public class TopicMessageImpl<T> implements Message<T> {
     public Optional<EncryptionContext> getEncryptionCtx() {
         return msg.getEncryptionCtx();
     }
+
+    public Message<T> getMessage() {
+        return msg;
+    }
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4380c63..f5108fc 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -64,7 +64,7 @@ public class ContextImplTest {
         logger = mock(Logger.class);
         client = mock(PulsarClientImpl.class);
         when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, Schema.BYTES));
-        when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class)))
+        when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), Matchers.any(Schema.class), eq(null)))
                 .thenReturn(CompletableFuture.completedFuture(producer));
         when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
         when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 89efcf5..399a4c7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -21,7 +21,8 @@ package org.apache.pulsar.functions.instance.producers;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+import org.apache.pulsar.client.api.ProducerInterceptor;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -190,6 +192,11 @@ public class MultiConsumersOneOutputTopicProducersTest {
         public ProducerBuilder<byte[]> properties(Map<String, String> properties) {
             return this;
         }
+
+        @Override
+        public ProducerBuilder<byte[]> intercept(ProducerInterceptor<byte[]>... interceptors) {
+            return null;
+        }
     }
 
     @BeforeMethod
@@ -197,7 +204,7 @@ public class MultiConsumersOneOutputTopicProducersTest {
         this.mockClient = mock(PulsarClient.class);
 
         when(mockClient.newProducer(any(Schema.class)))
-            .thenReturn(new MockProducerBuilder());
+                .thenReturn(new MockProducerBuilder());
 
         producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES, "test");
         producers.initialize();
@@ -206,12 +213,12 @@ public class MultiConsumersOneOutputTopicProducersTest {
     private Producer<byte[]> createMockProducer(String topic) {
         Producer<byte[]> producer = mock(Producer.class);
         when(producer.closeAsync())
-            .thenAnswer(invocationOnMock -> {
-                synchronized (mockProducers) {
-                    mockProducers.remove(topic);
-                }
-                return FutureUtils.Void();
-            });
+                .thenAnswer(invocationOnMock -> {
+                    synchronized (mockProducers) {
+                        mockProducers.remove(topic);
+                    }
+                    return FutureUtils.Void();
+                });
         return producer;
     }
 
@@ -224,13 +231,13 @@ public class MultiConsumersOneOutputTopicProducersTest {
 
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer(Schema.BYTES);
+                .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // second get will not create a new producer
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
-            .newProducer(Schema.BYTES);
+                .newProducer(Schema.BYTES);
         assertTrue(producers.getProducers().containsKey(producerName));
 
         // close