You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/19 02:16:10 UTC
[pulsar] branch master updated: [improve][client-c++] support Exclusive Producer access mode for c++ (#17439)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 43de9f566d6 [improve][client-c++] support Exclusive Producer access mode for c++ (#17439)
43de9f566d6 is described below
commit 43de9f566d60de9937bdb6a0b11a8137ee3d6df8
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Sep 19 10:16:00 2022 +0800
[improve][client-c++] support Exclusive Producer access mode for c++ (#17439)
---
.../include/pulsar/ProducerConfiguration.h | 26 ++++++++++++++++++++++
pulsar-client-cpp/lib/ClientConnection.cc | 5 +++++
pulsar-client-cpp/lib/ClientConnection.h | 1 +
pulsar-client-cpp/lib/Commands.cc | 7 +++++-
pulsar-client-cpp/lib/Commands.h | 3 ++-
pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 8 ++++---
pulsar-client-cpp/lib/ProducerConfiguration.cc | 8 +++++++
pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ProducerImpl.cc | 16 +++++++------
pulsar-client-cpp/lib/ProducerImpl.h | 1 +
pulsar-client-cpp/tests/ProducerTest.cc | 24 ++++++++++++++++++++
11 files changed, 88 insertions(+), 12 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 7c278dd6e91..fb331ea828c 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -79,6 +79,18 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
KeyBasedBatching
};
+ enum ProducerAccessMode
+ {
+ /**
+ * By default multiple producers can publish on a topic.
+ */
+ Shared = 0,
+
+ /**
+ * Require exclusive access for producer. Fail immediately if there's already a producer connected.
+ */
+ Exclusive = 1
+ };
ProducerConfiguration();
~ProducerConfiguration();
@@ -501,6 +513,20 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
bool isChunkingEnabled() const;
+ /**
+ * Set the type of access mode that the producer requires on the topic.
+ *
+ * @see ProducerAccessMode
+ * @param accessMode
+ * The type of access to the topic that the producer requires
+ */
+ ProducerConfiguration& setAccessMode(const ProducerAccessMode& accessMode);
+
+ /**
+ * Get the type of access mode that the producer requires on the topic.
+ */
+ ProducerAccessMode getAccessMode() const;
+
friend class PulsarWrapper;
private:
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 162c148a8dc..20e34586e00 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1073,6 +1073,11 @@ void ClientConnection::handleIncomingCommand() {
if (producerSuccess.has_schema_version()) {
data.schemaVersion = producerSuccess.schema_version();
}
+ if (producerSuccess.has_topic_epoch()) {
+ data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
+ } else {
+ data.topicEpoch = Optional<uint64_t>::empty();
+ }
requestData.promise.setValue(data);
requestData.timer->cancel();
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index b615eaab2d0..418a5831397 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -77,6 +77,7 @@ struct ResponseData {
std::string producerName;
int64_t lastSequenceId;
std::string schemaVersion;
+ Optional<uint64_t> topicEpoch;
};
typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 3a00d302e86..417e6e31a17 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -351,7 +351,8 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
- bool userProvidedProducerName, bool encrypted) {
+ bool userProvidedProducerName, bool encrypted,
+ ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
BaseCommand cmd;
cmd.set_type(BaseCommand::PRODUCER);
CommandProducer* producer = cmd.mutable_producer();
@@ -361,6 +362,10 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
producer->set_epoch(epoch);
producer->set_user_provided_producer_name(userProvidedProducerName);
producer->set_encrypted(encrypted);
+ producer->set_producer_access_mode(accessMode);
+ if (topicEpoch.is_present()) {
+ producer->set_topic_epoch(topicEpoch.value());
+ }
for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
it++) {
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 87207e4df22..4ff8674497a 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -100,7 +100,8 @@ class Commands {
const std::string& producerName, uint64_t requestId,
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo, uint64_t epoch,
- bool userProvidedProducerName, bool encrypted);
+ bool userProvidedProducerName, bool encrypted,
+ proto::ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index bc6dd218e15..469ecc9e793 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -106,7 +106,7 @@ void PartitionedProducerImpl::start() {
// Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
// when `state_` is Ready
- if (conf_.getLazyStartPartitionedProducers()) {
+ if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) {
// start one producer now, to ensure authz errors occur now
// if the SinglePartition router is used, then this producer will serve
// all non-keyed messages in the future
@@ -398,9 +398,11 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));
for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
- auto producer = newInternalProducer(i, conf_.getLazyStartPartitionedProducers());
+ auto lazy = conf_.getLazyStartPartitionedProducers() &&
+ conf_.getAccessMode() == ProducerConfiguration::Shared;
+ auto producer = newInternalProducer(i, lazy);
- if (!conf_.getLazyStartPartitionedProducers()) {
+ if (!lazy) {
producer->start();
}
producers_.push_back(producer);
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 0ee38ca92e0..4f64870c06a 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -258,4 +258,12 @@ ProducerConfiguration& ProducerConfiguration::setChunkingEnabled(bool chunkingEn
bool ProducerConfiguration::isChunkingEnabled() const { return impl_->chunkingEnabled; }
+ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccessMode& accessMode) {
+ impl_->accessMode = accessMode;
+ return *this;
+}
+ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const {
+ return impl_->accessMode;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 2ac1ebaa5df..80c6432cfce 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -49,6 +49,7 @@ struct ProducerConfigurationImpl {
ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
std::map<std::string, std::string> properties;
bool chunkingEnabled{false};
+ ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 769a21a91a7..8c87086297f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -133,9 +133,10 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
ClientImplPtr client = client_.lock();
int requestId = client->newRequestId();
- SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
- conf_.getProperties(), conf_.getSchema(), epoch_,
- userProvidedProducerName_, conf_.isEncryptionEnabled());
+ SharedBuffer cmd = Commands::newProducer(
+ topic_, producerId_, producerName_, requestId, conf_.getProperties(), conf_.getSchema(), epoch_,
+ userProvidedProducerName_, conf_.isEncryptionEnabled(),
+ static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()), topicEpoch);
cnx->sendRequestWithId(cmd, requestId)
.addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
std::placeholders::_1, std::placeholders::_2));
@@ -145,7 +146,7 @@ void ProducerImpl::connectionFailed(Result result) {
// Keep a reference to ensure object is kept alive
ProducerImplPtr ptr = shared_from_this();
- if (conf_.getLazyStartPartitionedProducers()) {
+ if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) {
// if producers are lazy, then they should always try to restart
// so don't change the state and allow reconnections
return;
@@ -177,6 +178,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
+ topicEpoch = responseData.topicEpoch;
if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
lastSequenceIdPublished_ = responseData.lastSequenceId;
@@ -204,7 +206,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
}
// if the producer is lazy the send timeout timer is already running
- if (!conf_.getLazyStartPartitionedProducers()) {
+ if (!(conf_.getLazyStartPartitionedProducers() &&
+ conf_.getAccessMode() == ProducerConfiguration::Shared)) {
startSendTimeoutTimer();
}
@@ -542,7 +545,6 @@ Result ProducerImpl::canEnqueueRequest(uint32_t payloadSize) {
if (semaphore_ && !semaphore_->tryAcquire()) {
return ResultProducerQueueIsFull;
}
-
if (!memoryLimitController_.tryReserveMemory(payloadSize)) {
if (semaphore_) {
semaphore_->release(1);
@@ -853,7 +855,7 @@ void ProducerImpl::disconnectProducer() {
void ProducerImpl::start() {
HandlerBase::start();
- if (conf_.getLazyStartPartitionedProducers()) {
+ if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) {
// we need to kick it off now as it is possible that the connection may take
// longer than sendTimeout to connect
startSendTimeoutTimer();
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 43b33933934..74eee61066e 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -188,6 +188,7 @@ class ProducerImpl : public HandlerBase,
MemoryLimitController& memoryLimitController_;
const bool chunkingEnabled_;
+ Optional<uint64_t> topicEpoch{Optional<uint64_t>::empty()};
};
struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 43947ce7958..6315ac16f51 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -210,3 +210,27 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
client.close();
}
+
+TEST(ProducerTest, testExclusiveProducer) {
+ Client client(serviceUrl);
+
+ std::string topicName = "persistent://public/default/testExclusiveProducer";
+
+ Producer producer1;
+ ProducerConfiguration producerConfiguration1;
+ producerConfiguration1.setProducerName("p-name-1");
+ producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive);
+
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1));
+
+ Producer producer2;
+ ProducerConfiguration producerConfiguration2;
+ producerConfiguration2.setProducerName("p-name-2");
+ producerConfiguration2.setAccessMode(ProducerConfiguration::Exclusive);
+ ASSERT_EQ(ResultProducerFenced, client.createProducer(topicName, producerConfiguration2, producer2));
+
+ Producer producer3;
+ ProducerConfiguration producerConfiguration3;
+ producerConfiguration3.setProducerName("p-name-3");
+ ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
+}