You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 13:04:13 UTC

[pulsar] 07/07: [C++] Add consumer's configs for reader (#8905)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4da0bafae1bb94a9856f78243db20ede1b60f85c
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Dec 14 18:27:09 2020 -0600

    [C++] Add consumer's configs for reader (#8905)
    
    ### Motivation
    
    C++ reader cannot set some consumer's configs. Like `CryptoReader`, if the config is not supported for reader, then reader cannot decrypt encrypted messages. Also, reader cannot set other configs like `AckGroupingTimeMs`  so that reader cannot control the behavior of `AckGroupingTracker`.
    
    ### Modifications
    
    - Add other consumer's configs for reader, except:
        - NegativeAckRedeliveryDelayMs: reader has no chance to call `negativeAcknowledge`
        - BrokerConsumerStatsCacheTimeInMs: reader has no chance to call `getBrokerConsumerStatsAsync`
        - KeySharedPolicy and ConsumerType: reader only supports Exclusive subscription now
        - MaxTotalReceiverQueueSizeAcrossPartitions: reader doesn't support partitioned topics now
        - PatternAutoDiscoveryPeriod: reader doesn't support regex subscription now
    - Add some fields to get the consumer config before reader's internal consumer is created, then add some unit tests for the consumer config.
    
    (cherry picked from commit a3ac12e8cd255bbacd308f6e2c3186e2d3f98850)
---
 .../include/pulsar/ReaderConfiguration.h           |  93 ++++++++++++++
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  76 +++++++++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  20 +--
 pulsar-client-cpp/lib/ReaderImpl.cc                |  18 +++
 pulsar-client-cpp/lib/ReaderImpl.h                 |  10 +-
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc | 141 +++++++++++++++++++++
 6 files changed, 347 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 25b0ba3..96b36b6 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -25,6 +25,8 @@
 #include <pulsar/Result.h>
 #include <pulsar/Message.h>
 #include <pulsar/Schema.h>
+#include <pulsar/CryptoKeyReader.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
 
 namespace pulsar {
 
@@ -112,6 +114,97 @@ class PULSAR_PUBLIC ReaderConfiguration {
     void setInternalSubscriptionName(std::string internalSubscriptionName);
     const std::string& getInternalSubscriptionName() const;
 
+    /**
+     * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
+     * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
+     * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
+     * redelivered.
+     * @param timeout in milliseconds
+     */
+    void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
+
+    /**
+     * @return the configured timeout in milliseconds for unacked messages.
+     */
+    long getUnAckedMessagesTimeoutMs() const;
+
+    void setTickDurationInMs(const uint64_t milliSeconds);
+
+    long getTickDurationInMs() const;
+
+    /**
+     * Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
+     * to broker until the time window reaches its end, or the number of grouped messages reaches
+     * limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be
+     * directly sent to broker without grouping.
+     *
+     * @param ackGroupMillis time of ACK grouping window in milliseconds.
+     */
+    void setAckGroupingTimeMs(long ackGroupingMillis);
+
+    /**
+     * Get grouping time window in milliseconds.
+     *
+     * @return grouping time window in milliseconds.
+     */
+    long getAckGroupingTimeMs() const;
+
+    /**
+     * Set max number of grouped messages within one grouping time window. If it's set to a
+     * non-positive value, number of grouped messages is not limited. Default is 1000.
+     *
+     * @param maxGroupingSize max number of grouped messages with in one grouping time window.
+     */
+    void setAckGroupingMaxSize(long maxGroupingSize);
+
+    /**
+     * Get max number of grouped messages within one grouping time window.
+     *
+     * @return max number of grouped messages within one grouping time window.
+     */
+    long getAckGroupingMaxSize() const;
+
+    bool isEncryptionEnabled() const;
+    const CryptoKeyReaderPtr getCryptoKeyReader() const;
+    ReaderConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
+
+    ConsumerCryptoFailureAction getCryptoFailureAction() const;
+    ReaderConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action);
+
+    /**
+     * Check whether the message has a specific property attached.
+     *
+     * @param name the name of the property to check
+     * @return true if the message has the specified property
+     * @return false if the property is not defined
+     */
+    bool hasProperty(const std::string& name) const;
+
+    /**
+     * Get the value of a specific property
+     *
+     * @param name the name of the property
+     * @return the value of the property or null if the property was not defined
+     */
+    const std::string& getProperty(const std::string& name) const;
+
+    /**
+     * Get all the properties attached to this producer.
+     */
+    std::map<std::string, std::string>& getProperties() const;
+
+    /**
+     * Sets a new property on a message.
+     * @param name   the name of the property
+     * @param value  the associated value
+     */
+    ReaderConfiguration& setProperty(const std::string& name, const std::string& value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    ReaderConfiguration& setProperties(const std::map<std::string, std::string>& properties);
+
    private:
     std::shared_ptr<ReaderConfigurationImpl> impl_;
 };
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index eb18d11..0dfdbed 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -20,6 +20,8 @@
 
 namespace pulsar {
 
+const static std::string emptyString;
+
 ReaderConfiguration::ReaderConfiguration() : impl_(std::make_shared<ReaderConfigurationImpl>()) {}
 
 ReaderConfiguration::~ReaderConfiguration() {}
@@ -76,4 +78,78 @@ const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
     return impl_->internalSubscriptionName;
 }
 
+void ReaderConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) {
+    impl_->unAckedMessagesTimeoutMs = milliSeconds;
+}
+
+long ReaderConfiguration::getUnAckedMessagesTimeoutMs() const { return impl_->unAckedMessagesTimeoutMs; }
+
+void ReaderConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
+    impl_->tickDurationInMs = milliSeconds;
+}
+
+long ReaderConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
+
+void ReaderConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) {
+    impl_->ackGroupingTimeMs = ackGroupingMillis;
+}
+
+long ReaderConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; }
+
+void ReaderConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {
+    impl_->ackGroupingMaxSize = maxGroupingSize;
+}
+
+long ReaderConfiguration::getAckGroupingMaxSize() const { return impl_->ackGroupingMaxSize; }
+
+bool ReaderConfiguration::isEncryptionEnabled() const { return impl_->cryptoKeyReader != nullptr; }
+
+const CryptoKeyReaderPtr ReaderConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
+
+ReaderConfiguration& ReaderConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+    impl_->cryptoKeyReader = cryptoKeyReader;
+    return *this;
+}
+
+ConsumerCryptoFailureAction ReaderConfiguration::getCryptoFailureAction() const {
+    return impl_->cryptoFailureAction;
+}
+
+ReaderConfiguration& ReaderConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+    impl_->cryptoFailureAction = action;
+    return *this;
+}
+
+bool ReaderConfiguration::hasProperty(const std::string& name) const {
+    const auto& properties = impl_->properties;
+    return properties.find(name) != properties.cend();
+}
+
+const std::string& ReaderConfiguration::getProperty(const std::string& name) const {
+    const auto& properties = impl_->properties;
+    const auto it = properties.find(name);
+    return (it != properties.cend()) ? (it->second) : emptyString;
+}
+
+std::map<std::string, std::string>& ReaderConfiguration::getProperties() const { return impl_->properties; }
+
+ReaderConfiguration& ReaderConfiguration::setProperty(const std::string& name, const std::string& value) {
+    auto& properties = impl_->properties;
+    auto it = properties.find(name);
+    if (it != properties.end()) {
+        it->second = value;
+    } else {
+        properties.emplace(name, value);
+    }
+    return *this;
+}
+
+ReaderConfiguration& ReaderConfiguration::setProperties(
+    const std::map<std::string, std::string>& properties) {
+    for (const auto& kv : properties) {
+        setProperty(kv.first, kv.second);
+    }
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index 3b5cc8a..6f38c29 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -25,19 +25,19 @@ namespace pulsar {
 struct ReaderConfigurationImpl {
     SchemaInfo schemaInfo;
     ReaderListener readerListener;
-    bool hasReaderListener;
-    int receiverQueueSize;
+    bool hasReaderListener{false};
+    int receiverQueueSize{1000};
     std::string readerName;
     std::string subscriptionRolePrefix;
-    bool readCompacted;
+    bool readCompacted{false};
     std::string internalSubscriptionName;
-    ReaderConfigurationImpl()
-        : schemaInfo(),
-          hasReaderListener(false),
-          receiverQueueSize(1000),
-          readerName(),
-          subscriptionRolePrefix(),
-          readCompacted(false) {}
+    long unAckedMessagesTimeoutMs{0};
+    long tickDurationInMs{1000};
+    long ackGroupingTimeMs{100};
+    long ackGroupingMaxSize{1000};
+    CryptoKeyReaderPtr cryptoKeyReader;
+    ConsumerCryptoFailureAction cryptoFailureAction;
+    std::map<std::string, std::string> properties;
 };
 }  // namespace pulsar
 #endif /* LIB_READERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 2ea3430..bcf707d 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -23,6 +23,12 @@
 
 namespace pulsar {
 
+namespace test {
+std::mutex readerConfigTestMutex;
+std::atomic_bool readerConfigTestEnabled{false};
+ConsumerConfiguration consumerConfigOfReader;
+}  // namespace test
+
 static ResultCallback emptyCallback;
 
 ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
@@ -35,6 +41,13 @@ void ReaderImpl::start(const MessageId& startMessageId) {
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
     consumerConf.setReadCompacted(readerConf_.isReadCompacted());
     consumerConf.setSchema(readerConf_.getSchema());
+    consumerConf.setUnAckedMessagesTimeoutMs(readerConf_.getUnAckedMessagesTimeoutMs());
+    consumerConf.setTickDurationInMs(readerConf_.getTickDurationInMs());
+    consumerConf.setAckGroupingTimeMs(readerConf_.getAckGroupingTimeMs());
+    consumerConf.setAckGroupingMaxSize(readerConf_.getAckGroupingMaxSize());
+    consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
+    consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
+    consumerConf.setProperties(readerConf_.getProperties());
 
     if (readerConf_.getReaderName().length() > 0) {
         consumerConf.setConsumerName(readerConf_.getReaderName());
@@ -57,6 +70,11 @@ void ReaderImpl::start(const MessageId& startMessageId) {
         }
     }
 
+    // get the consumer's configuration before created
+    if (test::readerConfigTestEnabled) {
+        test::consumerConfigOfReader = consumerConf.clone();
+    }
+
     consumer_ = std::make_shared<ConsumerImpl>(
         client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
         Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 4069247..f1ad387 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -29,7 +29,15 @@ class ReaderImpl;
 typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
 typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
 
-class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
+namespace test {
+
+extern std::mutex readerConfigTestMutex PULSAR_PUBLIC;
+extern std::atomic_bool readerConfigTestEnabled PULSAR_PUBLIC;
+extern ConsumerConfiguration consumerConfigOfReader PULSAR_PUBLIC;
+
+}  // namespace test
+
+class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
    public:
     ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
                const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
new file mode 100644
index 0000000..6af4b4a
--- /dev/null
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This test only tests the ConsumerConfiguration used for the Reader's internal consumer.
+ * Because the ReaderConfiguration for Reader itself is meaningless.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <lib/LogUtils.h>
+#include <lib/ReaderImpl.h>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+class NoOpsCryptoKeyReader : public CryptoKeyReader {
+   public:
+    Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                        EncryptionKeyInfo& encKeyInfo) const override {
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                         EncryptionKeyInfo& encKeyInfo) const override {
+        return ResultOk;
+    }
+};
+
+TEST(ReaderConfigurationTest, testDefaultConfig) {
+    const std::string topic = "ReaderConfigurationTest-default-config";
+    Client client(lookupUrl);
+    ReaderConfiguration readerConf;
+    Reader reader;
+
+    std::unique_lock<std::mutex> lock(test::readerConfigTestMutex);
+    test::readerConfigTestEnabled = true;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+    const auto consumerConf = test::consumerConfigOfReader.clone();
+    test::readerConfigTestEnabled = false;
+    lock.unlock();
+
+    ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive);
+    ASSERT_EQ(consumerConf.getReceiverQueueSize(), 1000);
+    ASSERT_EQ(consumerConf.isReadCompacted(), false);
+    ASSERT_EQ(consumerConf.getSchema().getName(), "BYTES");
+    ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 0);
+    ASSERT_EQ(consumerConf.getTickDurationInMs(), 1000);
+    ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 100);
+    ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 1000);
+    ASSERT_EQ(consumerConf.getCryptoKeyReader().get(), nullptr);
+    ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
+    ASSERT_TRUE(consumerConf.getProperties().empty());
+    ASSERT_TRUE(consumerConf.getConsumerName().empty());
+    ASSERT_FALSE(consumerConf.hasMessageListener());
+
+    client.close();
+}
+
+TEST(ReaderConfigurationTest, testCustomConfig) {
+    const std::string exampleSchema =
+        "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+        "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
+    const std::string topic = "ReaderConfigurationTest-custom-config";
+    Client client(lookupUrl);
+
+    const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+    ProducerConfiguration producerConf;
+    producerConf.setSchema(schema);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+    ASSERT_FALSE(producer.getSchemaVersion().empty());
+
+    ReaderConfiguration readerConf;
+    readerConf.setSchema(schema);
+    readerConf.setReaderListener([](Reader, const Message&) {});
+    readerConf.setReceiverQueueSize(2000);
+    readerConf.setReaderName("my-reader");
+    readerConf.setReadCompacted(true);
+    readerConf.setUnAckedMessagesTimeoutMs(11000);
+    readerConf.setTickDurationInMs(2000);
+    readerConf.setAckGroupingTimeMs(0);
+    readerConf.setAckGroupingMaxSize(4096);
+    const auto cryptoReader = std::make_shared<NoOpsCryptoKeyReader>();
+    readerConf.setCryptoKeyReader(cryptoReader);
+    readerConf.setCryptoFailureAction(ConsumerCryptoFailureAction::DISCARD);
+    const std::map<std::string, std::string> properties{{"key-1", "value-1"}, {"key-2", "value-2"}};
+    readerConf.setProperties(properties);
+
+    Reader reader;
+    std::unique_lock<std::mutex> lock(test::readerConfigTestMutex);
+    test::readerConfigTestEnabled = true;
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+    const auto consumerConf = test::consumerConfigOfReader.clone();
+    test::readerConfigTestEnabled = false;
+    lock.unlock();
+
+    ASSERT_EQ(consumerConf.getSchema().getName(), schema.getName());
+    ASSERT_EQ(consumerConf.getSchema().getSchemaType(), schema.getSchemaType());
+    ASSERT_EQ(consumerConf.getSchema().getSchema(), schema.getSchema());
+    ASSERT_EQ(consumerConf.getSchema().getProperties(), schema.getProperties());
+
+    ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive);
+    ASSERT_TRUE(consumerConf.hasMessageListener());
+    ASSERT_EQ(consumerConf.getReceiverQueueSize(), 2000);
+    ASSERT_EQ(consumerConf.getConsumerName(), "my-reader");
+    ASSERT_EQ(consumerConf.isReadCompacted(), true);
+    ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 11000);
+    ASSERT_EQ(consumerConf.getTickDurationInMs(), 2000);
+    ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 0);
+    ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 4096);
+    ASSERT_EQ(consumerConf.getCryptoKeyReader(), cryptoReader);
+    ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::DISCARD);
+    ASSERT_EQ(consumerConf.getProperties(), properties);
+    ASSERT_TRUE(consumerConf.hasProperty("key-1"));
+    ASSERT_EQ(consumerConf.getProperty("key-1"), "value-1");
+    ASSERT_TRUE(consumerConf.hasProperty("key-2"));
+    ASSERT_EQ(consumerConf.getProperty("key-2"), "value-2");
+    ASSERT_FALSE(consumerConf.hasProperty("key-3"));
+
+    client.close();
+}