You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/19 02:16:10 UTC

[pulsar] branch master updated: [improve][client-c++] support Exclusive Producer access mode for c++ (#17439)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 43de9f566d6 [improve][client-c++] support Exclusive Producer access mode for c++ (#17439)
43de9f566d6 is described below

commit 43de9f566d60de9937bdb6a0b11a8137ee3d6df8
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Mon Sep 19 10:16:00 2022 +0800

    [improve][client-c++] support Exclusive Producer access mode for c++ (#17439)
---
 .../include/pulsar/ProducerConfiguration.h         | 26 ++++++++++++++++++++++
 pulsar-client-cpp/lib/ClientConnection.cc          |  5 +++++
 pulsar-client-cpp/lib/ClientConnection.h           |  1 +
 pulsar-client-cpp/lib/Commands.cc                  |  7 +++++-
 pulsar-client-cpp/lib/Commands.h                   |  3 ++-
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |  8 ++++---
 pulsar-client-cpp/lib/ProducerConfiguration.cc     |  8 +++++++
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |  1 +
 pulsar-client-cpp/lib/ProducerImpl.cc              | 16 +++++++------
 pulsar-client-cpp/lib/ProducerImpl.h               |  1 +
 pulsar-client-cpp/tests/ProducerTest.cc            | 24 ++++++++++++++++++++
 11 files changed, 88 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 7c278dd6e91..fb331ea828c 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -79,6 +79,18 @@ class PULSAR_PUBLIC ProducerConfiguration {
          */
         KeyBasedBatching
     };
+    enum ProducerAccessMode
+    {
+        /**
+         * By default multiple producers can publish on a topic.
+         */
+        Shared = 0,
+
+        /**
+         * Require exclusive access for producer. Fail immediately if there's already a producer connected.
+         */
+        Exclusive = 1
+    };
 
     ProducerConfiguration();
     ~ProducerConfiguration();
@@ -501,6 +513,20 @@ class PULSAR_PUBLIC ProducerConfiguration {
      */
     bool isChunkingEnabled() const;
 
+    /**
+     * Set the type of access mode that the producer requires on the topic.
+     *
+     * @see ProducerAccessMode
+     * @param accessMode
+     *            The type of access to the topic that the producer requires
+     */
+    ProducerConfiguration& setAccessMode(const ProducerAccessMode& accessMode);
+
+    /**
+     * Get the type of access mode that the producer requires on the topic.
+     */
+    ProducerAccessMode getAccessMode() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 162c148a8dc..20e34586e00 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -1073,6 +1073,11 @@ void ClientConnection::handleIncomingCommand() {
                         if (producerSuccess.has_schema_version()) {
                             data.schemaVersion = producerSuccess.schema_version();
                         }
+                        if (producerSuccess.has_topic_epoch()) {
+                            data.topicEpoch = Optional<uint64_t>::of(producerSuccess.topic_epoch());
+                        } else {
+                            data.topicEpoch = Optional<uint64_t>::empty();
+                        }
                         requestData.promise.setValue(data);
                         requestData.timer->cancel();
                     }
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index b615eaab2d0..418a5831397 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -77,6 +77,7 @@ struct ResponseData {
     std::string producerName;
     int64_t lastSequenceId;
     std::string schemaVersion;
+    Optional<uint64_t> topicEpoch;
 };
 
 typedef std::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 3a00d302e86..417e6e31a17 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -351,7 +351,8 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
                                    const std::string& producerName, uint64_t requestId,
                                    const std::map<std::string, std::string>& metadata,
                                    const SchemaInfo& schemaInfo, uint64_t epoch,
-                                   bool userProvidedProducerName, bool encrypted) {
+                                   bool userProvidedProducerName, bool encrypted,
+                                   ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::PRODUCER);
     CommandProducer* producer = cmd.mutable_producer();
@@ -361,6 +362,10 @@ SharedBuffer Commands::newProducer(const std::string& topic, uint64_t producerId
     producer->set_epoch(epoch);
     producer->set_user_provided_producer_name(userProvidedProducerName);
     producer->set_encrypted(encrypted);
+    producer->set_producer_access_mode(accessMode);
+    if (topicEpoch.is_present()) {
+        producer->set_topic_epoch(topicEpoch.value());
+    }
 
     for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end();
          it++) {
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 87207e4df22..4ff8674497a 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -100,7 +100,8 @@ class Commands {
                                     const std::string& producerName, uint64_t requestId,
                                     const std::map<std::string, std::string>& metadata,
                                     const SchemaInfo& schemaInfo, uint64_t epoch,
-                                    bool userProvidedProducerName, bool encrypted);
+                                    bool userProvidedProducerName, bool encrypted,
+                                    proto::ProducerAccessMode accessMode, Optional<uint64_t> topicEpoch);
 
     static SharedBuffer newAck(uint64_t consumerId, const proto::MessageIdData& messageId,
                                proto::CommandAck_AckType ackType, int validationError);
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index bc6dd218e15..469ecc9e793 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -106,7 +106,7 @@ void PartitionedProducerImpl::start() {
     // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
     // when `state_` is Ready
 
-    if (conf_.getLazyStartPartitionedProducers()) {
+    if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) {
         // start one producer now, to ensure authz errors occur now
         // if the SinglePartition router is used, then this producer will serve
         // all non-keyed messages in the future
@@ -398,9 +398,11 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
             topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));
 
             for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
-                auto producer = newInternalProducer(i, conf_.getLazyStartPartitionedProducers());
+                auto lazy = conf_.getLazyStartPartitionedProducers() &&
+                            conf_.getAccessMode() == ProducerConfiguration::Shared;
+                auto producer = newInternalProducer(i, lazy);
 
-                if (!conf_.getLazyStartPartitionedProducers()) {
+                if (!lazy) {
                     producer->start();
                 }
                 producers_.push_back(producer);
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 0ee38ca92e0..4f64870c06a 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -258,4 +258,12 @@ ProducerConfiguration& ProducerConfiguration::setChunkingEnabled(bool chunkingEn
 
 bool ProducerConfiguration::isChunkingEnabled() const { return impl_->chunkingEnabled; }
 
+ProducerConfiguration& ProducerConfiguration::setAccessMode(const ProducerAccessMode& accessMode) {
+    impl_->accessMode = accessMode;
+    return *this;
+}
+ProducerConfiguration::ProducerAccessMode ProducerConfiguration::getAccessMode() const {
+    return impl_->accessMode;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 2ac1ebaa5df..80c6432cfce 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -49,6 +49,7 @@ struct ProducerConfigurationImpl {
     ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
     std::map<std::string, std::string> properties;
     bool chunkingEnabled{false};
+    ProducerConfiguration::ProducerAccessMode accessMode{ProducerConfiguration::Shared};
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 769a21a91a7..8c87086297f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -133,9 +133,10 @@ void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     ClientImplPtr client = client_.lock();
     int requestId = client->newRequestId();
 
-    SharedBuffer cmd = Commands::newProducer(topic_, producerId_, producerName_, requestId,
-                                             conf_.getProperties(), conf_.getSchema(), epoch_,
-                                             userProvidedProducerName_, conf_.isEncryptionEnabled());
+    SharedBuffer cmd = Commands::newProducer(
+        topic_, producerId_, producerName_, requestId, conf_.getProperties(), conf_.getSchema(), epoch_,
+        userProvidedProducerName_, conf_.isEncryptionEnabled(),
+        static_cast<proto::ProducerAccessMode>(conf_.getAccessMode()), topicEpoch);
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(std::bind(&ProducerImpl::handleCreateProducer, shared_from_this(), cnx,
                                std::placeholders::_1, std::placeholders::_2));
@@ -145,7 +146,7 @@ void ProducerImpl::connectionFailed(Result result) {
     // Keep a reference to ensure object is kept alive
     ProducerImplPtr ptr = shared_from_this();
 
-    if (conf_.getLazyStartPartitionedProducers()) {
+    if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) {
         // if producers are lazy, then they should always try to restart
         // so don't change the state and allow reconnections
         return;
@@ -177,6 +178,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
         producerStr_ = "[" + topic_ + ", " + producerName_ + "] ";
+        topicEpoch = responseData.topicEpoch;
 
         if (lastSequenceIdPublished_ == -1 && conf_.getInitialSequenceId() == -1) {
             lastSequenceIdPublished_ = responseData.lastSequenceId;
@@ -204,7 +206,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         }
 
         // if the producer is lazy the send timeout timer is already running
-        if (!conf_.getLazyStartPartitionedProducers()) {
+        if (!(conf_.getLazyStartPartitionedProducers() &&
+              conf_.getAccessMode() == ProducerConfiguration::Shared)) {
             startSendTimeoutTimer();
         }
 
@@ -542,7 +545,6 @@ Result ProducerImpl::canEnqueueRequest(uint32_t payloadSize) {
         if (semaphore_ && !semaphore_->tryAcquire()) {
             return ResultProducerQueueIsFull;
         }
-
         if (!memoryLimitController_.tryReserveMemory(payloadSize)) {
             if (semaphore_) {
                 semaphore_->release(1);
@@ -853,7 +855,7 @@ void ProducerImpl::disconnectProducer() {
 void ProducerImpl::start() {
     HandlerBase::start();
 
-    if (conf_.getLazyStartPartitionedProducers()) {
+    if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) {
         // we need to kick it off now as it is possible that the connection may take
         // longer than sendTimeout to connect
         startSendTimeoutTimer();
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 43b33933934..74eee61066e 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -188,6 +188,7 @@ class ProducerImpl : public HandlerBase,
 
     MemoryLimitController& memoryLimitController_;
     const bool chunkingEnabled_;
+    Optional<uint64_t> topicEpoch{Optional<uint64_t>::empty()};
 };
 
 struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 43947ce7958..6315ac16f51 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -210,3 +210,27 @@ TEST(ProducerTest, testBacklogQuotasExceeded) {
 
     client.close();
 }
+
+TEST(ProducerTest, testExclusiveProducer) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testExclusiveProducer";
+
+    Producer producer1;
+    ProducerConfiguration producerConfiguration1;
+    producerConfiguration1.setProducerName("p-name-1");
+    producerConfiguration1.setAccessMode(ProducerConfiguration::Exclusive);
+
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfiguration1, producer1));
+
+    Producer producer2;
+    ProducerConfiguration producerConfiguration2;
+    producerConfiguration2.setProducerName("p-name-2");
+    producerConfiguration2.setAccessMode(ProducerConfiguration::Exclusive);
+    ASSERT_EQ(ResultProducerFenced, client.createProducer(topicName, producerConfiguration2, producer2));
+
+    Producer producer3;
+    ProducerConfiguration producerConfiguration3;
+    producerConfiguration3.setProducerName("p-name-3");
+    ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, producerConfiguration3, producer3));
+}