You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/02/20 11:34:11 UTC

[pulsar-client-cpp] branch main updated: [feat] Add producer interceptor (#169)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 86c915b  [feat] Add producer interceptor (#169)
86c915b is described below

commit 86c915bfe17d940b38110dd22c7a4e9dbe55e5cb
Author: Zike Yang <zi...@apache.org>
AuthorDate: Mon Feb 20 19:34:04 2023 +0800

    [feat] Add producer interceptor (#169)
    
    Master Issue: #150
    
    ### Motivation
    
    This is the producer interceptor implementation of #150.
    
    ### Modifications
    
    * Add ProducerInterceptor interface
    * Add `intercept` and `getInterceptors` in `ProducerConfiguration`.
    * Triggering the interceptor when `beforeSend`, `onSendAcknowledgement`, `onPartitionsChange` and `close`.
---
 include/pulsar/Producer.h              |   1 +
 include/pulsar/ProducerConfiguration.h |   5 +
 include/pulsar/ProducerInterceptor.h   | 114 ++++++++++++++++++++
 lib/ClientImpl.cc                      |  10 +-
 lib/PartitionedProducerImpl.cc         |  11 +-
 lib/PartitionedProducerImpl.h          |   5 +-
 lib/ProducerConfiguration.cc           |  11 ++
 lib/ProducerConfigurationImpl.h        |   1 +
 lib/ProducerImpl.cc                    |  17 ++-
 lib/ProducerImpl.h                     |   5 +-
 lib/ProducerImplBase.h                 |   4 +
 lib/ProducerInterceptors.cc            |  84 +++++++++++++++
 lib/ProducerInterceptors.h             |  56 ++++++++++
 tests/InterceptorsTest.cc              | 186 +++++++++++++++++++++++++++++++++
 14 files changed, 499 insertions(+), 11 deletions(-)

diff --git a/include/pulsar/Producer.h b/include/pulsar/Producer.h
index 955d985..51d2980 100644
--- a/include/pulsar/Producer.h
+++ b/include/pulsar/Producer.h
@@ -166,6 +166,7 @@ class PULSAR_PUBLIC Producer {
     friend class ClientImpl;
     friend class PulsarFriend;
     friend class PulsarWrapper;
+    friend class ProducerImpl;
 
     ProducerImplBasePtr impl_;
 
diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h
index 67550cf..e0c824c 100644
--- a/include/pulsar/ProducerConfiguration.h
+++ b/include/pulsar/ProducerConfiguration.h
@@ -23,6 +23,7 @@
 #include <pulsar/Message.h>
 #include <pulsar/MessageRoutingPolicy.h>
 #include <pulsar/ProducerCryptoFailureAction.h>
+#include <pulsar/ProducerInterceptor.h>
 #include <pulsar/Result.h>
 #include <pulsar/Schema.h>
 #include <pulsar/defines.h>
@@ -532,6 +533,10 @@ class PULSAR_PUBLIC ProducerConfiguration {
      */
     ProducerAccessMode getAccessMode() const;
 
+    ProducerConfiguration& intercept(const std::vector<ProducerInterceptorPtr>& interceptors);
+
+    const std::vector<ProducerInterceptorPtr>& getInterceptors() const;
+
    private:
     std::shared_ptr<ProducerConfigurationImpl> impl_;
 
diff --git a/include/pulsar/ProducerInterceptor.h b/include/pulsar/ProducerInterceptor.h
new file mode 100644
index 0000000..45f55b5
--- /dev/null
+++ b/include/pulsar/ProducerInterceptor.h
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PULSAR_PRODUCER_INTERCEPTOR_H
+#define PULSAR_PRODUCER_INTERCEPTOR_H
+
+#include <pulsar/Message.h>
+#include <pulsar/Result.h>
+#include <pulsar/defines.h>
+
+/**
+ * An interface that allows you to intercept (and possibly mutate) the
+ * messages received by the producer before they are published to the Pulsar
+ * brokers.
+ *
+ * <p>Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
+ * not propagated further.
+ *
+ * <p>ProducerInterceptor callbacks may be called from multiple threads. Interceptor
+ * implementation must ensure thread-safety, if needed.
+ */
+namespace pulsar {
+
+class Producer;
+
+class PULSAR_PUBLIC ProducerInterceptor {
+   public:
+    virtual ~ProducerInterceptor() {}
+
+    /**
+     * Close the interceptor
+     */
+    virtual void close() {}
+
+    /**
+     * This is called from Producer#send and Producer#sendAsync methods, before
+     * send the message to the brokers. This method is allowed to modify the
+     * record, in which case, the new record will be returned.
+     *
+     * <p>Any exception thrown by this method will be caught by the caller and
+     * logged, but not propagated further.
+     *
+     * <p>Since the producer may run multiple interceptors, a particular
+     * interceptor's #beforeSend(Producer, Message) callback will be called in the
+     * order specified by ProducerConfiguration#intercept().
+     *
+     * <p>The first interceptor in the list gets the message passed from the client,
+     * the following interceptor will be passed the message returned by the
+     * previous interceptor, and so on. Since interceptors are allowed to modify
+     * messages, interceptors may potentially get the message already modified by
+     * other interceptors. However, building a pipeline of mutable interceptors
+     * that depend on the output of the previous interceptor is discouraged,
+     * because of potential side-effects caused by interceptors potentially
+     * failing to modify the message and throwing an exception. If one of the
+     * interceptors in the list throws an exception from beforeSend(Message),
+     * the exception is caught, logged, and the next interceptor is called with
+     * the message returned by the last successful interceptor in the list,
+     * or otherwise the client.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param message message to send.
+     * @return the intercepted message.
+     */
+    virtual Message beforeSend(const Producer& producer, const Message& message) = 0;
+
+    /**
+     * This method is called when the message sent to the broker has been
+     * acknowledged, or when sending the message fails.
+     * This method is generally called just before the user callback is
+     * called.
+     *
+     * <p>Any exception thrown by this method will be ignored by the caller.
+     *
+     * <p>This method will generally execute in the background I/O thread, so the
+     * implementation should be reasonably fast. Otherwise, sending of messages
+     * from other threads could be delayed.
+     *
+     * @param producer the producer which contains the interceptor.
+     * @param result the result for sending messages, ResultOk indicates send has succeed.
+     * @param message the message that application sends.
+     * @param messageID the message id that assigned by the broker.
+     */
+    virtual void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+                                       const MessageId& messageID) = 0;
+
+    /**
+     * This method is called when partitions of the topic (partitioned-topic) changes.
+     *
+     * @param topicName topic name
+     * @param partitions new updated partitions
+     */
+    virtual void onPartitionsChange(const std::string& topicName, int partitions) {}
+};
+
+typedef std::shared_ptr<ProducerInterceptor> ProducerInterceptorPtr;
+}  // namespace pulsar
+
+#endif  // PULSAR_PRODUCER_INTERCEPTOR_H
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 7575b28..e009277 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -33,6 +33,7 @@
 #include "PartitionedProducerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
 #include "ProducerImpl.h"
+#include "ProducerInterceptors.h"
 #include "ReaderImpl.h"
 #include "RetryableLookupService.h"
 #include "TimeUtils.h"
@@ -187,11 +188,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
                                       CreateProducerCallback callback) {
     if (!result) {
         ProducerImplBasePtr producer;
+
+        auto interceptors = std::make_shared<ProducerInterceptors>(conf.getInterceptors());
+
         if (partitionMetadata->getPartitions() > 0) {
-            producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
-                                                                 partitionMetadata->getPartitions(), conf);
+            producer = std::make_shared<PartitionedProducerImpl>(
+                shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors);
         } else {
-            producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf);
+            producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf, interceptors);
         }
         producer->getProducerCreatedFuture().addListener(
             std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index a4b5310..e442ce9 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -38,13 +38,15 @@ const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-"
 
 PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const TopicNamePtr topicName,
                                                  const unsigned int numPartitions,
-                                                 const ProducerConfiguration& config)
+                                                 const ProducerConfiguration& config,
+                                                 const ProducerInterceptorsPtr& interceptors)
     : client_(client),
       topicName_(topicName),
       topic_(topicName_->toString()),
       conf_(config),
       topicMetadata_(new TopicMetadataImpl(numPartitions)),
-      flushedPartitions_(0) {
+      flushedPartitions_(0),
+      interceptors_(interceptors) {
     routerPolicy_ = getMessageRouter();
 
     int maxPendingMessagesPerPartition =
@@ -93,7 +95,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
 ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
     using namespace std::placeholders;
     auto client = client_.lock();
-    auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, partition);
+    auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, interceptors_, partition);
     if (!client) {
         return producer;
     }
@@ -227,6 +229,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
 // override
 void PartitionedProducerImpl::shutdown() {
     cancelTimers();
+    interceptors_->close();
     auto client = client_.lock();
     if (client) {
         client->cleanupProducer(this);
@@ -446,7 +449,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
                 }
                 producers_.push_back(producer);
             }
+            producersLock.unlock();
             // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
+            interceptors_->onPartitionsChange(getTopic(), newNumPartitions);
             return;
         }
     } else {
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index b9a4b01..25ba9c3 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -27,6 +27,7 @@
 
 #include "LookupDataResult.h"
 #include "ProducerImplBase.h"
+#include "ProducerInterceptors.h"
 
 namespace pulsar {
 
@@ -59,7 +60,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     typedef std::unique_lock<std::mutex> Lock;
 
     PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
-                            const ProducerConfiguration& config);
+                            const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors);
     virtual ~PartitionedProducerImpl();
 
     // overrided methods from ProducerImplBase
@@ -130,6 +131,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
     boost::posix_time::time_duration partitionsUpdateInterval_;
     LookupServicePtr lookupServicePtr_;
 
+    ProducerInterceptorsPtr interceptors_;
+
     unsigned int getNumPartitions() const;
     unsigned int getNumPartitionsWithLock() const;
     ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 67e8b10..0e141ae 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -16,6 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <pulsar/ProducerConfiguration.h>
+
 #include <stdexcept>
 
 #include "ProducerConfigurationImpl.h"
@@ -265,5 +267,14 @@ ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccess
 ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const {
     return impl_->accessMode;
 }
+ProducerConfiguration& ProducerConfiguration::intercept(
+    const std::vector<ProducerInterceptorPtr>& interceptors) {
+    impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end());
+    return *this;
+}
+
+const std::vector<ProducerInterceptorPtr>& ProducerConfiguration::getInterceptors() const {
+    return impl_->interceptors;
+}
 
 }  // namespace pulsar
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index 4c8fcdb..c635c48 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -51,6 +51,7 @@ struct ProducerConfigurationImpl {
     bool chunkingEnabled{false};
     ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
     std::string initialSubscriptionName;
+    std::vector<ProducerInterceptorPtr> interceptors;
 };
 }  // namespace pulsar
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 6514b3b..78a5115 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -55,7 +55,8 @@ struct ProducerImpl::PendingCallbacks {
 };
 
 ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
-                           const ProducerConfiguration& conf, int32_t partition)
+                           const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors,
+                           int32_t partition)
     : HandlerBase(client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition),
                   Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
                           milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()),
@@ -73,7 +74,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
       sendTimer_(executor_->getIOService()),
       dataKeyRefreshTask_(executor_->getIOService(), 4 * 60 * 60 * 1000),
       memoryLimitController_(client->getMemoryLimitController()),
-      chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
+      chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
+      interceptors_(interceptors) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
                                 << " id: " << producerId_);
 
@@ -432,10 +434,17 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload,
 void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     producerStatsBasePtr_->messageSent(msg);
 
+    Producer producer = Producer(shared_from_this());
+    auto interceptorMessage = interceptors_->beforeSend(producer, msg);
+
     const auto now = boost::posix_time::microsec_clock::universal_time();
     auto self = shared_from_this();
-    sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) {
+    sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage](
+                                                     Result result, const MessageId& messageId) {
         producerStatsBasePtr_->messageReceived(result, now);
+
+        interceptors_->onSendAcknowledgement(producer, result, interceptorMessage, messageId);
+
         if (callback) {
             callback(result, messageId);
         }
@@ -931,6 +940,7 @@ void ProducerImpl::start() {
 
 void ProducerImpl::shutdown() {
     resetCnx();
+    interceptors_->close();
     auto client = client_.lock();
     if (client) {
         client->cleanupProducer(this);
@@ -979,3 +989,4 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
 ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }
 
 }  // namespace pulsar
+/* namespace pulsar */
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 928fca5..b0c9f7c 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -64,7 +64,8 @@ class ProducerImpl : public HandlerBase,
                      public ProducerImplBase {
    public:
     ProducerImpl(ClientImplPtr client, const TopicName& topic,
-                 const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
+                 const ProducerConfiguration& producerConfiguration,
+                 const ProducerInterceptorsPtr& interceptors, int32_t partition = -1);
     ~ProducerImpl();
 
     // overrided methods from ProducerImplBase
@@ -196,6 +197,8 @@ class ProducerImpl : public HandlerBase,
     MemoryLimitController& memoryLimitController_;
     const bool chunkingEnabled_;
     boost::optional<uint64_t> topicEpoch;
+
+    ProducerInterceptorsPtr interceptors_;
 };
 
 struct ProducerImplCmp {
diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h
index 0b1622c..95488d8 100644
--- a/lib/ProducerImplBase.h
+++ b/lib/ProducerImplBase.h
@@ -22,6 +22,7 @@
 #include <pulsar/Producer.h>
 
 #include "Future.h"
+#include "ProducerInterceptors.h"
 
 namespace pulsar {
 class ProducerImplBase;
@@ -48,6 +49,9 @@ class ProducerImplBase {
     virtual void flushAsync(FlushCallback callback) = 0;
     virtual bool isConnected() const = 0;
     virtual uint64_t getNumberOfConnectedProducer() = 0;
+
+   protected:
+    ProducerInterceptorsPtr interceptors_;
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/lib/ProducerInterceptors.cc b/lib/ProducerInterceptors.cc
new file mode 100644
index 0000000..ea35f97
--- /dev/null
+++ b/lib/ProducerInterceptors.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ProducerInterceptors.h"
+
+#include <pulsar/Producer.h>
+
+#include "LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+void ProducerInterceptors::onPartitionsChange(const std::string& topicName, int partitions) const {
+    for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+        try {
+            interceptor->onPartitionsChange(topicName, partitions);
+        } catch (const std::exception& e) {
+            LOG_WARN("Error executing interceptor onPartitionsChange callback for topicName: "
+                     << topicName << ", exception: " << e.what());
+        }
+    }
+}
+
+Message ProducerInterceptors::beforeSend(const Producer& producer, const Message& message) {
+    if (interceptors_.empty()) {
+        return message;
+    }
+
+    Message interceptorMessage = message;
+    for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+        try {
+            interceptorMessage = interceptor->beforeSend(producer, interceptorMessage);
+        } catch (const std::exception& e) {
+            LOG_WARN("Error executing interceptor beforeSend callback for topicName: "
+                     << producer.getTopic() << ", exception: " << e.what());
+        }
+    }
+    return interceptorMessage;
+}
+
+void ProducerInterceptors::onSendAcknowledgement(const Producer& producer, Result result,
+                                                 const Message& message, const MessageId& messageID) {
+    for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+        try {
+            interceptor->onSendAcknowledgement(producer, result, message, messageID);
+        } catch (const std::exception& e) {
+            LOG_WARN("Error executing interceptor onSendAcknowledgement callback for topicName: "
+                     << producer.getTopic() << ", exception: " << e.what());
+        }
+    }
+}
+
+void ProducerInterceptors::close() {
+    State state = Ready;
+    if (!state_.compare_exchange_strong(state, Closing)) {
+        return;
+    }
+    for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+        try {
+            interceptor->close();
+        } catch (const std::exception& e) {
+            LOG_WARN("Failed to close producer interceptor: " << e.what());
+        }
+    }
+    state_ = Closed;
+}
+
+}  // namespace pulsar
diff --git a/lib/ProducerInterceptors.h b/lib/ProducerInterceptors.h
new file mode 100644
index 0000000..f83394f
--- /dev/null
+++ b/lib/ProducerInterceptors.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/ProducerInterceptor.h>
+
+#include <atomic>
+#include <utility>
+#include <vector>
+
+namespace pulsar {
+
+class ProducerInterceptors {
+   public:
+    explicit ProducerInterceptors(std::vector<ProducerInterceptorPtr> interceptors)
+        : interceptors_(std::move(interceptors)) {}
+
+    void onPartitionsChange(const std::string& topicName, int partitions) const;
+
+    Message beforeSend(const Producer& producer, const Message& message);
+
+    void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+                               const MessageId& messageID);
+
+    void close();
+
+   private:
+    enum State
+    {
+        Ready,
+        Closing,
+        Closed
+    };
+    std::vector<ProducerInterceptorPtr> interceptors_;
+    std::atomic<State> state_{Ready};
+};
+
+typedef std::shared_ptr<ProducerInterceptors> ProducerInterceptorsPtr;
+}  // namespace pulsar
diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc
new file mode 100644
index 0000000..ef645c2
--- /dev/null
+++ b/tests/InterceptorsTest.cc
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/ProducerInterceptor.h>
+
+#include <utility>
+
+#include "HttpHelper.h"
+#include "Latch.h"
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+using namespace pulsar;
+
+class TestInterceptor : public ProducerInterceptor {
+   public:
+    TestInterceptor(Latch& latch, Latch& closeLatch) : latch_(latch), closeLatch_(closeLatch) {}
+
+    Message beforeSend(const Producer& producer, const Message& message) override {
+        return MessageBuilder().setProperty("key", "set").setContent(message.getDataAsString()).build();
+    }
+
+    void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+                               const MessageId& messageID) override {
+        ASSERT_EQ(result, ResultOk);
+        auto properties = message.getProperties();
+        ASSERT_TRUE(properties.find("key") != properties.end() && properties["key"] == "set");
+        latch_.countdown();
+    }
+
+    void close() override { closeLatch_.countdown(); }
+
+   private:
+    Latch latch_;
+    Latch closeLatch_;
+};
+
+class ExceptionInterceptor : public ProducerInterceptor {
+   public:
+    explicit ExceptionInterceptor(Latch& latch) : latch_(latch) {}
+
+    Message beforeSend(const Producer& producer, const Message& message) override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+    void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+                               const MessageId& messageID) override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+    void close() override {
+        latch_.countdown();
+        throw std::runtime_error("expected exception");
+    }
+
+   private:
+    Latch latch_;
+};
+
+class PartitionsChangeInterceptor : public ProducerInterceptor {
+   public:
+    explicit PartitionsChangeInterceptor(Latch& latch) : latch_(latch) {}
+
+    Message beforeSend(const Producer& producer, const Message& message) override { return message; }
+
+    void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+                               const MessageId& messageID) override {}
+
+    void onPartitionsChange(const std::string& topicName, int partitions) override {
+        ASSERT_EQ(partitions, 3);
+        latch_.countdown();
+    }
+
+   private:
+    Latch latch_;
+};
+
+void createPartitionedTopic(std::string topic) {
+    std::string topicOperateUrl = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions";
+
+    int res = makePutRequest(topicOperateUrl, "2");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+}
+
+class InterceptorsTest : public ::testing::TestWithParam<bool> {};
+
+TEST_P(InterceptorsTest, testProducerInterceptor) {
+    const std::string topic = "InterceptorsTest-testProducerInterceptor-" + std::to_string(time(nullptr));
+
+    if (GetParam()) {
+        createPartitionedTopic(topic);
+    }
+
+    Latch latch(1);
+    Latch closeLatch(1);
+
+    Client client(serviceUrl);
+    ProducerConfiguration conf;
+    conf.intercept({std::make_shared<TestInterceptor>(latch, closeLatch)});
+    Producer producer;
+    client.createProducer(topic, conf, producer);
+
+    Message msg = MessageBuilder().setContent("content").build();
+    Result result = producer.send(msg);
+    ASSERT_EQ(result, ResultOk);
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+
+    producer.close();
+    ASSERT_TRUE(closeLatch.wait(std::chrono::seconds(5)));
+    client.close();
+}
+
+TEST_P(InterceptorsTest, testProducerInterceptorWithException) {
+    const std::string topic =
+        "InterceptorsTest-testProducerInterceptorWithException-" + std::to_string(time(nullptr));
+
+    if (GetParam()) {
+        createPartitionedTopic(topic);
+    }
+
+    Latch latch(3);
+
+    Client client(serviceUrl);
+    ProducerConfiguration conf;
+    conf.intercept({std::make_shared<ExceptionInterceptor>(latch)});
+    Producer producer;
+    client.createProducer(topic, conf, producer);
+
+    Message msg = MessageBuilder().setContent("content").build();
+    Result result = producer.send(msg);
+    ASSERT_EQ(result, ResultOk);
+
+    producer.close();
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+    client.close();
+}
+
+TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) {
+    const std::string topic = "public/default/InterceptorsTest-testProducerInterceptorOnPartitionsChange-" +
+                              std::to_string(time(nullptr));
+    std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions";
+
+    int res = makePutRequest(topicOperateUrl, "2");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Latch latch(1);
+
+    ClientConfiguration clientConf;
+    clientConf.setPartititionsUpdateInterval(1);
+    Client client(serviceUrl, clientConf);
+    ProducerConfiguration conf;
+    conf.intercept({std::make_shared<PartitionsChangeInterceptor>(latch)});
+    Producer producer;
+    client.createProducer(topic, conf, producer);
+
+    res = makePostRequest(topicOperateUrl, "3");  // update partitions to 3
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+
+    producer.close();
+    client.close();
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, InterceptorsTest, ::testing::Values(true, false));