You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2023/02/20 11:34:11 UTC
[pulsar-client-cpp] branch main updated: [feat] Add producer interceptor (#169)
This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 86c915b [feat] Add producer interceptor (#169)
86c915b is described below
commit 86c915bfe17d940b38110dd22c7a4e9dbe55e5cb
Author: Zike Yang <zi...@apache.org>
AuthorDate: Mon Feb 20 19:34:04 2023 +0800
[feat] Add producer interceptor (#169)
Master Issue: #150
### Motivation
This is the producer interceptor implementation of #150.
### Modifications
* Add ProducerInterceptor interface
* Add `intercept` and `getInterceptors` in `ProducerConfiguration`.
* Triggering the interceptor when `beforeSend`, `onSendAcknowledgement`, `onPartitionsChange` and `close`.
---
include/pulsar/Producer.h | 1 +
include/pulsar/ProducerConfiguration.h | 5 +
include/pulsar/ProducerInterceptor.h | 114 ++++++++++++++++++++
lib/ClientImpl.cc | 10 +-
lib/PartitionedProducerImpl.cc | 11 +-
lib/PartitionedProducerImpl.h | 5 +-
lib/ProducerConfiguration.cc | 11 ++
lib/ProducerConfigurationImpl.h | 1 +
lib/ProducerImpl.cc | 17 ++-
lib/ProducerImpl.h | 5 +-
lib/ProducerImplBase.h | 4 +
lib/ProducerInterceptors.cc | 84 +++++++++++++++
lib/ProducerInterceptors.h | 56 ++++++++++
tests/InterceptorsTest.cc | 186 +++++++++++++++++++++++++++++++++
14 files changed, 499 insertions(+), 11 deletions(-)
diff --git a/include/pulsar/Producer.h b/include/pulsar/Producer.h
index 955d985..51d2980 100644
--- a/include/pulsar/Producer.h
+++ b/include/pulsar/Producer.h
@@ -166,6 +166,7 @@ class PULSAR_PUBLIC Producer {
friend class ClientImpl;
friend class PulsarFriend;
friend class PulsarWrapper;
+ friend class ProducerImpl;
ProducerImplBasePtr impl_;
diff --git a/include/pulsar/ProducerConfiguration.h b/include/pulsar/ProducerConfiguration.h
index 67550cf..e0c824c 100644
--- a/include/pulsar/ProducerConfiguration.h
+++ b/include/pulsar/ProducerConfiguration.h
@@ -23,6 +23,7 @@
#include <pulsar/Message.h>
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/ProducerCryptoFailureAction.h>
+#include <pulsar/ProducerInterceptor.h>
#include <pulsar/Result.h>
#include <pulsar/Schema.h>
#include <pulsar/defines.h>
@@ -532,6 +533,10 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
ProducerAccessMode getAccessMode() const;
+ ProducerConfiguration& intercept(const std::vector<ProducerInterceptorPtr>& interceptors);
+
+ const std::vector<ProducerInterceptorPtr>& getInterceptors() const;
+
private:
std::shared_ptr<ProducerConfigurationImpl> impl_;
diff --git a/include/pulsar/ProducerInterceptor.h b/include/pulsar/ProducerInterceptor.h
new file mode 100644
index 0000000..45f55b5
--- /dev/null
+++ b/include/pulsar/ProducerInterceptor.h
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef PULSAR_PRODUCER_INTERCEPTOR_H
+#define PULSAR_PRODUCER_INTERCEPTOR_H
+
+#include <pulsar/Message.h>
+#include <pulsar/Result.h>
+#include <pulsar/defines.h>
+
+/**
+ * An interface that allows you to intercept (and possibly mutate) the
+ * messages received by the producer before they are published to the Pulsar
+ * brokers.
+ *
+ * <p>Exceptions thrown by ProducerInterceptor methods will be caught, logged, but
+ * not propagated further.
+ *
+ * <p>ProducerInterceptor callbacks may be called from multiple threads. Interceptor
+ * implementation must ensure thread-safety, if needed.
+ */
+namespace pulsar {
+
+class Producer;
+
+class PULSAR_PUBLIC ProducerInterceptor {
+ public:
+ virtual ~ProducerInterceptor() {}
+
+ /**
+ * Close the interceptor
+ */
+ virtual void close() {}
+
+ /**
+ * This is called from Producer#send and Producer#sendAsync methods, before
+ * send the message to the brokers. This method is allowed to modify the
+ * record, in which case, the new record will be returned.
+ *
+ * <p>Any exception thrown by this method will be caught by the caller and
+ * logged, but not propagated further.
+ *
+ * <p>Since the producer may run multiple interceptors, a particular
+ * interceptor's #beforeSend(Producer, Message) callback will be called in the
+ * order specified by ProducerConfiguration#intercept().
+ *
+ * <p>The first interceptor in the list gets the message passed from the client,
+ * the following interceptor will be passed the message returned by the
+ * previous interceptor, and so on. Since interceptors are allowed to modify
+ * messages, interceptors may potentially get the message already modified by
+ * other interceptors. However, building a pipeline of mutable interceptors
+ * that depend on the output of the previous interceptor is discouraged,
+ * because of potential side-effects caused by interceptors potentially
+ * failing to modify the message and throwing an exception. If one of the
+ * interceptors in the list throws an exception from beforeSend(Message),
+ * the exception is caught, logged, and the next interceptor is called with
+ * the message returned by the last successful interceptor in the list,
+ * or otherwise the client.
+ *
+ * @param producer the producer which contains the interceptor.
+ * @param message message to send.
+ * @return the intercepted message.
+ */
+ virtual Message beforeSend(const Producer& producer, const Message& message) = 0;
+
+ /**
+ * This method is called when the message sent to the broker has been
+ * acknowledged, or when sending the message fails.
+ * This method is generally called just before the user callback is
+ * called.
+ *
+ * <p>Any exception thrown by this method will be ignored by the caller.
+ *
+ * <p>This method will generally execute in the background I/O thread, so the
+ * implementation should be reasonably fast. Otherwise, sending of messages
+ * from other threads could be delayed.
+ *
+ * @param producer the producer which contains the interceptor.
+ * @param result the result for sending messages, ResultOk indicates send has succeed.
+ * @param message the message that application sends.
+ * @param messageID the message id that assigned by the broker.
+ */
+ virtual void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+ const MessageId& messageID) = 0;
+
+ /**
+ * This method is called when partitions of the topic (partitioned-topic) changes.
+ *
+ * @param topicName topic name
+ * @param partitions new updated partitions
+ */
+ virtual void onPartitionsChange(const std::string& topicName, int partitions) {}
+};
+
+typedef std::shared_ptr<ProducerInterceptor> ProducerInterceptorPtr;
+} // namespace pulsar
+
+#endif // PULSAR_PRODUCER_INTERCEPTOR_H
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index 7575b28..e009277 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -33,6 +33,7 @@
#include "PartitionedProducerImpl.h"
#include "PatternMultiTopicsConsumerImpl.h"
#include "ProducerImpl.h"
+#include "ProducerInterceptors.h"
#include "ReaderImpl.h"
#include "RetryableLookupService.h"
#include "TimeUtils.h"
@@ -187,11 +188,14 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
CreateProducerCallback callback) {
if (!result) {
ProducerImplBasePtr producer;
+
+ auto interceptors = std::make_shared<ProducerInterceptors>(conf.getInterceptors());
+
if (partitionMetadata->getPartitions() > 0) {
- producer = std::make_shared<PartitionedProducerImpl>(shared_from_this(), topicName,
- partitionMetadata->getPartitions(), conf);
+ producer = std::make_shared<PartitionedProducerImpl>(
+ shared_from_this(), topicName, partitionMetadata->getPartitions(), conf, interceptors);
} else {
- producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf);
+ producer = std::make_shared<ProducerImpl>(shared_from_this(), *topicName, conf, interceptors);
}
producer->getProducerCreatedFuture().addListener(
std::bind(&ClientImpl::handleProducerCreated, shared_from_this(), std::placeholders::_1,
diff --git a/lib/PartitionedProducerImpl.cc b/lib/PartitionedProducerImpl.cc
index a4b5310..e442ce9 100644
--- a/lib/PartitionedProducerImpl.cc
+++ b/lib/PartitionedProducerImpl.cc
@@ -38,13 +38,15 @@ const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-"
PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const TopicNamePtr topicName,
const unsigned int numPartitions,
- const ProducerConfiguration& config)
+ const ProducerConfiguration& config,
+ const ProducerInterceptorsPtr& interceptors)
: client_(client),
topicName_(topicName),
topic_(topicName_->toString()),
conf_(config),
topicMetadata_(new TopicMetadataImpl(numPartitions)),
- flushedPartitions_(0) {
+ flushedPartitions_(0),
+ interceptors_(interceptors) {
routerPolicy_ = getMessageRouter();
int maxPendingMessagesPerPartition =
@@ -93,7 +95,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
using namespace std::placeholders;
auto client = client_.lock();
- auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, partition);
+ auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, interceptors_, partition);
if (!client) {
return producer;
}
@@ -227,6 +229,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
// override
void PartitionedProducerImpl::shutdown() {
cancelTimers();
+ interceptors_->close();
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
@@ -446,7 +449,9 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
}
producers_.push_back(producer);
}
+ producersLock.unlock();
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
+ interceptors_->onPartitionsChange(getTopic(), newNumPartitions);
return;
}
} else {
diff --git a/lib/PartitionedProducerImpl.h b/lib/PartitionedProducerImpl.h
index b9a4b01..25ba9c3 100644
--- a/lib/PartitionedProducerImpl.h
+++ b/lib/PartitionedProducerImpl.h
@@ -27,6 +27,7 @@
#include "LookupDataResult.h"
#include "ProducerImplBase.h"
+#include "ProducerInterceptors.h"
namespace pulsar {
@@ -59,7 +60,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
typedef std::unique_lock<std::mutex> Lock;
PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
- const ProducerConfiguration& config);
+ const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors);
virtual ~PartitionedProducerImpl();
// overrided methods from ProducerImplBase
@@ -130,6 +131,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
boost::posix_time::time_duration partitionsUpdateInterval_;
LookupServicePtr lookupServicePtr_;
+ ProducerInterceptorsPtr interceptors_;
+
unsigned int getNumPartitions() const;
unsigned int getNumPartitionsWithLock() const;
ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
diff --git a/lib/ProducerConfiguration.cc b/lib/ProducerConfiguration.cc
index 67e8b10..0e141ae 100644
--- a/lib/ProducerConfiguration.cc
+++ b/lib/ProducerConfiguration.cc
@@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include <pulsar/ProducerConfiguration.h>
+
#include <stdexcept>
#include "ProducerConfigurationImpl.h"
@@ -265,5 +267,14 @@ ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccess
ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const {
return impl_->accessMode;
}
+ProducerConfiguration& ProducerConfiguration::intercept(
+ const std::vector<ProducerInterceptorPtr>& interceptors) {
+ impl_->interceptors.insert(impl_->interceptors.end(), interceptors.begin(), interceptors.end());
+ return *this;
+}
+
+const std::vector<ProducerInterceptorPtr>& ProducerConfiguration::getInterceptors() const {
+ return impl_->interceptors;
+}
} // namespace pulsar
diff --git a/lib/ProducerConfigurationImpl.h b/lib/ProducerConfigurationImpl.h
index 4c8fcdb..c635c48 100644
--- a/lib/ProducerConfigurationImpl.h
+++ b/lib/ProducerConfigurationImpl.h
@@ -51,6 +51,7 @@ struct ProducerConfigurationImpl {
bool chunkingEnabled{false};
ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
std::string initialSubscriptionName;
+ std::vector<ProducerInterceptorPtr> interceptors;
};
} // namespace pulsar
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 6514b3b..78a5115 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -55,7 +55,8 @@ struct ProducerImpl::PendingCallbacks {
};
ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
- const ProducerConfiguration& conf, int32_t partition)
+ const ProducerConfiguration& conf, const ProducerInterceptorsPtr& interceptors,
+ int32_t partition)
: HandlerBase(client, (partition < 0) ? topicName.toString() : topicName.getTopicPartitionName(partition),
Backoff(milliseconds(client->getClientConfig().getInitialBackoffIntervalMs()),
milliseconds(client->getClientConfig().getMaxBackoffIntervalMs()),
@@ -73,7 +74,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
sendTimer_(executor_->getIOService()),
dataKeyRefreshTask_(executor_->getIOService(), 4 * 60 * 60 * 1000),
memoryLimitController_(client->getMemoryLimitController()),
- chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()) {
+ chunkingEnabled_(conf_.isChunkingEnabled() && topicName.isPersistent() && !conf_.getBatchingEnabled()),
+ interceptors_(interceptors) {
LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_
<< " id: " << producerId_);
@@ -432,10 +434,17 @@ static SharedBuffer applyCompression(const SharedBuffer& uncompressedPayload,
void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
producerStatsBasePtr_->messageSent(msg);
+ Producer producer = Producer(shared_from_this());
+ auto interceptorMessage = interceptors_->beforeSend(producer, msg);
+
const auto now = boost::posix_time::microsec_clock::universal_time();
auto self = shared_from_this();
- sendAsyncWithStatsUpdate(msg, [this, self, now, callback](Result result, const MessageId& messageId) {
+ sendAsyncWithStatsUpdate(interceptorMessage, [this, self, now, callback, producer, interceptorMessage](
+ Result result, const MessageId& messageId) {
producerStatsBasePtr_->messageReceived(result, now);
+
+ interceptors_->onSendAcknowledgement(producer, result, interceptorMessage, messageId);
+
if (callback) {
callback(result, messageId);
}
@@ -931,6 +940,7 @@ void ProducerImpl::start() {
void ProducerImpl::shutdown() {
resetCnx();
+ interceptors_->close();
auto client = client_.lock();
if (client) {
client->cleanupProducer(this);
@@ -979,3 +989,4 @@ void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
ProducerImplWeakPtr ProducerImpl::weak_from_this() noexcept { return shared_from_this(); }
} // namespace pulsar
+/* namespace pulsar */
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index 928fca5..b0c9f7c 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -64,7 +64,8 @@ class ProducerImpl : public HandlerBase,
public ProducerImplBase {
public:
ProducerImpl(ClientImplPtr client, const TopicName& topic,
- const ProducerConfiguration& producerConfiguration, int32_t partition = -1);
+ const ProducerConfiguration& producerConfiguration,
+ const ProducerInterceptorsPtr& interceptors, int32_t partition = -1);
~ProducerImpl();
// overrided methods from ProducerImplBase
@@ -196,6 +197,8 @@ class ProducerImpl : public HandlerBase,
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;
boost::optional<uint64_t> topicEpoch;
+
+ ProducerInterceptorsPtr interceptors_;
};
struct ProducerImplCmp {
diff --git a/lib/ProducerImplBase.h b/lib/ProducerImplBase.h
index 0b1622c..95488d8 100644
--- a/lib/ProducerImplBase.h
+++ b/lib/ProducerImplBase.h
@@ -22,6 +22,7 @@
#include <pulsar/Producer.h>
#include "Future.h"
+#include "ProducerInterceptors.h"
namespace pulsar {
class ProducerImplBase;
@@ -48,6 +49,9 @@ class ProducerImplBase {
virtual void flushAsync(FlushCallback callback) = 0;
virtual bool isConnected() const = 0;
virtual uint64_t getNumberOfConnectedProducer() = 0;
+
+ protected:
+ ProducerInterceptorsPtr interceptors_;
};
} // namespace pulsar
#endif // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/lib/ProducerInterceptors.cc b/lib/ProducerInterceptors.cc
new file mode 100644
index 0000000..ea35f97
--- /dev/null
+++ b/lib/ProducerInterceptors.cc
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ProducerInterceptors.h"
+
+#include <pulsar/Producer.h>
+
+#include "LogUtils.h"
+
+DECLARE_LOG_OBJECT()
+
+namespace pulsar {
+void ProducerInterceptors::onPartitionsChange(const std::string& topicName, int partitions) const {
+ for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+ try {
+ interceptor->onPartitionsChange(topicName, partitions);
+ } catch (const std::exception& e) {
+ LOG_WARN("Error executing interceptor onPartitionsChange callback for topicName: "
+ << topicName << ", exception: " << e.what());
+ }
+ }
+}
+
+Message ProducerInterceptors::beforeSend(const Producer& producer, const Message& message) {
+ if (interceptors_.empty()) {
+ return message;
+ }
+
+ Message interceptorMessage = message;
+ for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+ try {
+ interceptorMessage = interceptor->beforeSend(producer, interceptorMessage);
+ } catch (const std::exception& e) {
+ LOG_WARN("Error executing interceptor beforeSend callback for topicName: "
+ << producer.getTopic() << ", exception: " << e.what());
+ }
+ }
+ return interceptorMessage;
+}
+
+void ProducerInterceptors::onSendAcknowledgement(const Producer& producer, Result result,
+ const Message& message, const MessageId& messageID) {
+ for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+ try {
+ interceptor->onSendAcknowledgement(producer, result, message, messageID);
+ } catch (const std::exception& e) {
+ LOG_WARN("Error executing interceptor onSendAcknowledgement callback for topicName: "
+ << producer.getTopic() << ", exception: " << e.what());
+ }
+ }
+}
+
+void ProducerInterceptors::close() {
+ State state = Ready;
+ if (!state_.compare_exchange_strong(state, Closing)) {
+ return;
+ }
+ for (const ProducerInterceptorPtr& interceptor : interceptors_) {
+ try {
+ interceptor->close();
+ } catch (const std::exception& e) {
+ LOG_WARN("Failed to close producer interceptor: " << e.what());
+ }
+ }
+ state_ = Closed;
+}
+
+} // namespace pulsar
diff --git a/lib/ProducerInterceptors.h b/lib/ProducerInterceptors.h
new file mode 100644
index 0000000..f83394f
--- /dev/null
+++ b/lib/ProducerInterceptors.h
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/ProducerInterceptor.h>
+
+#include <atomic>
+#include <utility>
+#include <vector>
+
+namespace pulsar {
+
+class ProducerInterceptors {
+ public:
+ explicit ProducerInterceptors(std::vector<ProducerInterceptorPtr> interceptors)
+ : interceptors_(std::move(interceptors)) {}
+
+ void onPartitionsChange(const std::string& topicName, int partitions) const;
+
+ Message beforeSend(const Producer& producer, const Message& message);
+
+ void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+ const MessageId& messageID);
+
+ void close();
+
+ private:
+ enum State
+ {
+ Ready,
+ Closing,
+ Closed
+ };
+ std::vector<ProducerInterceptorPtr> interceptors_;
+ std::atomic<State> state_{Ready};
+};
+
+typedef std::shared_ptr<ProducerInterceptors> ProducerInterceptorsPtr;
+} // namespace pulsar
diff --git a/tests/InterceptorsTest.cc b/tests/InterceptorsTest.cc
new file mode 100644
index 0000000..ef645c2
--- /dev/null
+++ b/tests/InterceptorsTest.cc
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/ProducerInterceptor.h>
+
+#include <utility>
+
+#include "HttpHelper.h"
+#include "Latch.h"
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+using namespace pulsar;
+
+class TestInterceptor : public ProducerInterceptor {
+ public:
+ TestInterceptor(Latch& latch, Latch& closeLatch) : latch_(latch), closeLatch_(closeLatch) {}
+
+ Message beforeSend(const Producer& producer, const Message& message) override {
+ return MessageBuilder().setProperty("key", "set").setContent(message.getDataAsString()).build();
+ }
+
+ void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+ const MessageId& messageID) override {
+ ASSERT_EQ(result, ResultOk);
+ auto properties = message.getProperties();
+ ASSERT_TRUE(properties.find("key") != properties.end() && properties["key"] == "set");
+ latch_.countdown();
+ }
+
+ void close() override { closeLatch_.countdown(); }
+
+ private:
+ Latch latch_;
+ Latch closeLatch_;
+};
+
+class ExceptionInterceptor : public ProducerInterceptor {
+ public:
+ explicit ExceptionInterceptor(Latch& latch) : latch_(latch) {}
+
+ Message beforeSend(const Producer& producer, const Message& message) override {
+ latch_.countdown();
+ throw std::runtime_error("expected exception");
+ }
+
+ void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+ const MessageId& messageID) override {
+ latch_.countdown();
+ throw std::runtime_error("expected exception");
+ }
+
+ void close() override {
+ latch_.countdown();
+ throw std::runtime_error("expected exception");
+ }
+
+ private:
+ Latch latch_;
+};
+
+class PartitionsChangeInterceptor : public ProducerInterceptor {
+ public:
+ explicit PartitionsChangeInterceptor(Latch& latch) : latch_(latch) {}
+
+ Message beforeSend(const Producer& producer, const Message& message) override { return message; }
+
+ void onSendAcknowledgement(const Producer& producer, Result result, const Message& message,
+ const MessageId& messageID) override {}
+
+ void onPartitionsChange(const std::string& topicName, int partitions) override {
+ ASSERT_EQ(partitions, 3);
+ latch_.countdown();
+ }
+
+ private:
+ Latch latch_;
+};
+
+void createPartitionedTopic(std::string topic) {
+ std::string topicOperateUrl = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions";
+
+ int res = makePutRequest(topicOperateUrl, "2");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+}
+
+class InterceptorsTest : public ::testing::TestWithParam<bool> {};
+
+TEST_P(InterceptorsTest, testProducerInterceptor) {
+ const std::string topic = "InterceptorsTest-testProducerInterceptor-" + std::to_string(time(nullptr));
+
+ if (GetParam()) {
+ createPartitionedTopic(topic);
+ }
+
+ Latch latch(1);
+ Latch closeLatch(1);
+
+ Client client(serviceUrl);
+ ProducerConfiguration conf;
+ conf.intercept({std::make_shared<TestInterceptor>(latch, closeLatch)});
+ Producer producer;
+ client.createProducer(topic, conf, producer);
+
+ Message msg = MessageBuilder().setContent("content").build();
+ Result result = producer.send(msg);
+ ASSERT_EQ(result, ResultOk);
+
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+
+ producer.close();
+ ASSERT_TRUE(closeLatch.wait(std::chrono::seconds(5)));
+ client.close();
+}
+
+TEST_P(InterceptorsTest, testProducerInterceptorWithException) {
+ const std::string topic =
+ "InterceptorsTest-testProducerInterceptorWithException-" + std::to_string(time(nullptr));
+
+ if (GetParam()) {
+ createPartitionedTopic(topic);
+ }
+
+ Latch latch(3);
+
+ Client client(serviceUrl);
+ ProducerConfiguration conf;
+ conf.intercept({std::make_shared<ExceptionInterceptor>(latch)});
+ Producer producer;
+ client.createProducer(topic, conf, producer);
+
+ Message msg = MessageBuilder().setContent("content").build();
+ Result result = producer.send(msg);
+ ASSERT_EQ(result, ResultOk);
+
+ producer.close();
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+ client.close();
+}
+
+TEST(InterceptorsTest, testProducerInterceptorOnPartitionsChange) {
+ const std::string topic = "public/default/InterceptorsTest-testProducerInterceptorOnPartitionsChange-" +
+ std::to_string(time(nullptr));
+ std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topic + "/partitions";
+
+ int res = makePutRequest(topicOperateUrl, "2");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Latch latch(1);
+
+ ClientConfiguration clientConf;
+ clientConf.setPartititionsUpdateInterval(1);
+ Client client(serviceUrl, clientConf);
+ ProducerConfiguration conf;
+ conf.intercept({std::make_shared<PartitionsChangeInterceptor>(latch)});
+ Producer producer;
+ client.createProducer(topic, conf, producer);
+
+ res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ ASSERT_TRUE(latch.wait(std::chrono::seconds(5)));
+
+ producer.close();
+ client.close();
+}
+
+INSTANTIATE_TEST_CASE_P(Pulsar, InterceptorsTest, ::testing::Values(true, false));