You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 13:04:13 UTC
[pulsar] 07/07: [C++] Add consumer's configs for reader (#8905)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 4da0bafae1bb94a9856f78243db20ede1b60f85c
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Dec 14 18:27:09 2020 -0600
[C++] Add consumer's configs for reader (#8905)
### Motivation
C++ reader cannot set some consumer's configs. Like `CryptoReader`, if the config is not supported for reader, then reader cannot decrypt encrypted messages. Also, reader cannot set other configs like `AckGroupingTimeMs` so that reader cannot control the behavior of `AckGroupingTracker`.
### Modifications
- Add other consumer's configs for reader, except:
- NegativeAckRedeliveryDelayMs: reader has no chance to call `negativeAcknowledge`
- BrokerConsumerStatsCacheTimeInMs: reader has no chance to call `getBrokerConsumerStatsAsync`
- KeySharedPolicy and ConsumerType: reader only supports Exclusive subscription now
- MaxTotalReceiverQueueSizeAcrossPartitions: reader doesn't support partitioned topics now
- PatternAutoDiscoveryPeriod: reader doesn't support regex subscription now
- Add some fields to get the consumer config before reader's internal consumer is created, then add some unit tests for the consumer config.
(cherry picked from commit a3ac12e8cd255bbacd308f6e2c3186e2d3f98850)
---
.../include/pulsar/ReaderConfiguration.h | 93 ++++++++++++++
pulsar-client-cpp/lib/ReaderConfiguration.cc | 76 +++++++++++
pulsar-client-cpp/lib/ReaderConfigurationImpl.h | 20 +--
pulsar-client-cpp/lib/ReaderImpl.cc | 18 +++
pulsar-client-cpp/lib/ReaderImpl.h | 10 +-
pulsar-client-cpp/tests/ReaderConfigurationTest.cc | 141 +++++++++++++++++++++
6 files changed, 347 insertions(+), 11 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 25b0ba3..96b36b6 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -25,6 +25,8 @@
#include <pulsar/Result.h>
#include <pulsar/Message.h>
#include <pulsar/Schema.h>
+#include <pulsar/CryptoKeyReader.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
namespace pulsar {
@@ -112,6 +114,97 @@ class PULSAR_PUBLIC ReaderConfiguration {
void setInternalSubscriptionName(std::string internalSubscriptionName);
const std::string& getInternalSubscriptionName() const;
+ /**
+ * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
+ * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
+ * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
+ * redelivered.
+ * @param timeout in milliseconds
+ */
+ void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
+
+ /**
+ * @return the configured timeout in milliseconds for unacked messages.
+ */
+ long getUnAckedMessagesTimeoutMs() const;
+
+ void setTickDurationInMs(const uint64_t milliSeconds);
+
+ long getTickDurationInMs() const;
+
+ /**
+ * Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
+ * to broker until the time window reaches its end, or the number of grouped messages reaches
+ * limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be
+ * directly sent to broker without grouping.
+ *
+ * @param ackGroupMillis time of ACK grouping window in milliseconds.
+ */
+ void setAckGroupingTimeMs(long ackGroupingMillis);
+
+ /**
+ * Get grouping time window in milliseconds.
+ *
+ * @return grouping time window in milliseconds.
+ */
+ long getAckGroupingTimeMs() const;
+
+ /**
+ * Set max number of grouped messages within one grouping time window. If it's set to a
+ * non-positive value, number of grouped messages is not limited. Default is 1000.
+ *
+ * @param maxGroupingSize max number of grouped messages with in one grouping time window.
+ */
+ void setAckGroupingMaxSize(long maxGroupingSize);
+
+ /**
+ * Get max number of grouped messages within one grouping time window.
+ *
+ * @return max number of grouped messages within one grouping time window.
+ */
+ long getAckGroupingMaxSize() const;
+
+ bool isEncryptionEnabled() const;
+ const CryptoKeyReaderPtr getCryptoKeyReader() const;
+ ReaderConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader);
+
+ ConsumerCryptoFailureAction getCryptoFailureAction() const;
+ ReaderConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action);
+
+ /**
+ * Check whether the message has a specific property attached.
+ *
+ * @param name the name of the property to check
+ * @return true if the message has the specified property
+ * @return false if the property is not defined
+ */
+ bool hasProperty(const std::string& name) const;
+
+ /**
+ * Get the value of a specific property
+ *
+ * @param name the name of the property
+ * @return the value of the property or null if the property was not defined
+ */
+ const std::string& getProperty(const std::string& name) const;
+
+ /**
+ * Get all the properties attached to this producer.
+ */
+ std::map<std::string, std::string>& getProperties() const;
+
+ /**
+ * Sets a new property on a message.
+ * @param name the name of the property
+ * @param value the associated value
+ */
+ ReaderConfiguration& setProperty(const std::string& name, const std::string& value);
+
+ /**
+ * Add all the properties in the provided map
+ */
+ ReaderConfiguration& setProperties(const std::map<std::string, std::string>& properties);
+
private:
std::shared_ptr<ReaderConfigurationImpl> impl_;
};
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index eb18d11..0dfdbed 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -20,6 +20,8 @@
namespace pulsar {
+const static std::string emptyString;
+
ReaderConfiguration::ReaderConfiguration() : impl_(std::make_shared<ReaderConfigurationImpl>()) {}
ReaderConfiguration::~ReaderConfiguration() {}
@@ -76,4 +78,78 @@ const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
return impl_->internalSubscriptionName;
}
+void ReaderConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) {
+ impl_->unAckedMessagesTimeoutMs = milliSeconds;
+}
+
+long ReaderConfiguration::getUnAckedMessagesTimeoutMs() const { return impl_->unAckedMessagesTimeoutMs; }
+
+void ReaderConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
+ impl_->tickDurationInMs = milliSeconds;
+}
+
+long ReaderConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
+
+void ReaderConfiguration::setAckGroupingTimeMs(long ackGroupingMillis) {
+ impl_->ackGroupingTimeMs = ackGroupingMillis;
+}
+
+long ReaderConfiguration::getAckGroupingTimeMs() const { return impl_->ackGroupingTimeMs; }
+
+void ReaderConfiguration::setAckGroupingMaxSize(long maxGroupingSize) {
+ impl_->ackGroupingMaxSize = maxGroupingSize;
+}
+
+long ReaderConfiguration::getAckGroupingMaxSize() const { return impl_->ackGroupingMaxSize; }
+
+bool ReaderConfiguration::isEncryptionEnabled() const { return impl_->cryptoKeyReader != nullptr; }
+
+const CryptoKeyReaderPtr ReaderConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; }
+
+ReaderConfiguration& ReaderConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+ impl_->cryptoKeyReader = cryptoKeyReader;
+ return *this;
+}
+
+ConsumerCryptoFailureAction ReaderConfiguration::getCryptoFailureAction() const {
+ return impl_->cryptoFailureAction;
+}
+
+ReaderConfiguration& ReaderConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) {
+ impl_->cryptoFailureAction = action;
+ return *this;
+}
+
+bool ReaderConfiguration::hasProperty(const std::string& name) const {
+ const auto& properties = impl_->properties;
+ return properties.find(name) != properties.cend();
+}
+
+const std::string& ReaderConfiguration::getProperty(const std::string& name) const {
+ const auto& properties = impl_->properties;
+ const auto it = properties.find(name);
+ return (it != properties.cend()) ? (it->second) : emptyString;
+}
+
+std::map<std::string, std::string>& ReaderConfiguration::getProperties() const { return impl_->properties; }
+
+ReaderConfiguration& ReaderConfiguration::setProperty(const std::string& name, const std::string& value) {
+ auto& properties = impl_->properties;
+ auto it = properties.find(name);
+ if (it != properties.end()) {
+ it->second = value;
+ } else {
+ properties.emplace(name, value);
+ }
+ return *this;
+}
+
+ReaderConfiguration& ReaderConfiguration::setProperties(
+ const std::map<std::string, std::string>& properties) {
+ for (const auto& kv : properties) {
+ setProperty(kv.first, kv.second);
+ }
+ return *this;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index 3b5cc8a..6f38c29 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -25,19 +25,19 @@ namespace pulsar {
struct ReaderConfigurationImpl {
SchemaInfo schemaInfo;
ReaderListener readerListener;
- bool hasReaderListener;
- int receiverQueueSize;
+ bool hasReaderListener{false};
+ int receiverQueueSize{1000};
std::string readerName;
std::string subscriptionRolePrefix;
- bool readCompacted;
+ bool readCompacted{false};
std::string internalSubscriptionName;
- ReaderConfigurationImpl()
- : schemaInfo(),
- hasReaderListener(false),
- receiverQueueSize(1000),
- readerName(),
- subscriptionRolePrefix(),
- readCompacted(false) {}
+ long unAckedMessagesTimeoutMs{0};
+ long tickDurationInMs{1000};
+ long ackGroupingTimeMs{100};
+ long ackGroupingMaxSize{1000};
+ CryptoKeyReaderPtr cryptoKeyReader;
+ ConsumerCryptoFailureAction cryptoFailureAction;
+ std::map<std::string, std::string> properties;
};
} // namespace pulsar
#endif /* LIB_READERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 2ea3430..bcf707d 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -23,6 +23,12 @@
namespace pulsar {
+namespace test {
+std::mutex readerConfigTestMutex;
+std::atomic_bool readerConfigTestEnabled{false};
+ConsumerConfiguration consumerConfigOfReader;
+} // namespace test
+
static ResultCallback emptyCallback;
ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
@@ -35,6 +41,13 @@ void ReaderImpl::start(const MessageId& startMessageId) {
consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
consumerConf.setReadCompacted(readerConf_.isReadCompacted());
consumerConf.setSchema(readerConf_.getSchema());
+ consumerConf.setUnAckedMessagesTimeoutMs(readerConf_.getUnAckedMessagesTimeoutMs());
+ consumerConf.setTickDurationInMs(readerConf_.getTickDurationInMs());
+ consumerConf.setAckGroupingTimeMs(readerConf_.getAckGroupingTimeMs());
+ consumerConf.setAckGroupingMaxSize(readerConf_.getAckGroupingMaxSize());
+ consumerConf.setCryptoKeyReader(readerConf_.getCryptoKeyReader());
+ consumerConf.setCryptoFailureAction(readerConf_.getCryptoFailureAction());
+ consumerConf.setProperties(readerConf_.getProperties());
if (readerConf_.getReaderName().length() > 0) {
consumerConf.setConsumerName(readerConf_.getReaderName());
@@ -57,6 +70,11 @@ void ReaderImpl::start(const MessageId& startMessageId) {
}
}
+ // get the consumer's configuration before created
+ if (test::readerConfigTestEnabled) {
+ test::consumerConfigOfReader = consumerConf.clone();
+ }
+
consumer_ = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId));
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h
index 4069247..f1ad387 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -29,7 +29,15 @@ class ReaderImpl;
typedef std::shared_ptr<ReaderImpl> ReaderImplPtr;
typedef std::weak_ptr<ReaderImpl> ReaderImplWeakPtr;
-class ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
+namespace test {
+
+extern std::mutex readerConfigTestMutex PULSAR_PUBLIC;
+extern std::atomic_bool readerConfigTestEnabled PULSAR_PUBLIC;
+extern ConsumerConfiguration consumerConfigOfReader PULSAR_PUBLIC;
+
+} // namespace test
+
+class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
public:
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
new file mode 100644
index 0000000..6af4b4a
--- /dev/null
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * This test only tests the ConsumerConfiguration used for the Reader's internal consumer.
+ * Because the ReaderConfiguration for Reader itself is meaningless.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <lib/LogUtils.h>
+#include <lib/ReaderImpl.h>
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static const std::string lookupUrl = "pulsar://localhost:6650";
+
+class NoOpsCryptoKeyReader : public CryptoKeyReader {
+ public:
+ Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+ EncryptionKeyInfo& encKeyInfo) const override {
+ return ResultOk;
+ }
+
+ Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+ EncryptionKeyInfo& encKeyInfo) const override {
+ return ResultOk;
+ }
+};
+
+TEST(ReaderConfigurationTest, testDefaultConfig) {
+ const std::string topic = "ReaderConfigurationTest-default-config";
+ Client client(lookupUrl);
+ ReaderConfiguration readerConf;
+ Reader reader;
+
+ std::unique_lock<std::mutex> lock(test::readerConfigTestMutex);
+ test::readerConfigTestEnabled = true;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+ const auto consumerConf = test::consumerConfigOfReader.clone();
+ test::readerConfigTestEnabled = false;
+ lock.unlock();
+
+ ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive);
+ ASSERT_EQ(consumerConf.getReceiverQueueSize(), 1000);
+ ASSERT_EQ(consumerConf.isReadCompacted(), false);
+ ASSERT_EQ(consumerConf.getSchema().getName(), "BYTES");
+ ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 0);
+ ASSERT_EQ(consumerConf.getTickDurationInMs(), 1000);
+ ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 100);
+ ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 1000);
+ ASSERT_EQ(consumerConf.getCryptoKeyReader().get(), nullptr);
+ ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
+ ASSERT_TRUE(consumerConf.getProperties().empty());
+ ASSERT_TRUE(consumerConf.getConsumerName().empty());
+ ASSERT_FALSE(consumerConf.hasMessageListener());
+
+ client.close();
+}
+
+TEST(ReaderConfigurationTest, testCustomConfig) {
+ const std::string exampleSchema =
+ "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+ "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+
+ const std::string topic = "ReaderConfigurationTest-custom-config";
+ Client client(lookupUrl);
+
+ const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+ ProducerConfiguration producerConf;
+ producerConf.setSchema(schema);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+ ASSERT_FALSE(producer.getSchemaVersion().empty());
+
+ ReaderConfiguration readerConf;
+ readerConf.setSchema(schema);
+ readerConf.setReaderListener([](Reader, const Message&) {});
+ readerConf.setReceiverQueueSize(2000);
+ readerConf.setReaderName("my-reader");
+ readerConf.setReadCompacted(true);
+ readerConf.setUnAckedMessagesTimeoutMs(11000);
+ readerConf.setTickDurationInMs(2000);
+ readerConf.setAckGroupingTimeMs(0);
+ readerConf.setAckGroupingMaxSize(4096);
+ const auto cryptoReader = std::make_shared<NoOpsCryptoKeyReader>();
+ readerConf.setCryptoKeyReader(cryptoReader);
+ readerConf.setCryptoFailureAction(ConsumerCryptoFailureAction::DISCARD);
+ const std::map<std::string, std::string> properties{{"key-1", "value-1"}, {"key-2", "value-2"}};
+ readerConf.setProperties(properties);
+
+ Reader reader;
+ std::unique_lock<std::mutex> lock(test::readerConfigTestMutex);
+ test::readerConfigTestEnabled = true;
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+ const auto consumerConf = test::consumerConfigOfReader.clone();
+ test::readerConfigTestEnabled = false;
+ lock.unlock();
+
+ ASSERT_EQ(consumerConf.getSchema().getName(), schema.getName());
+ ASSERT_EQ(consumerConf.getSchema().getSchemaType(), schema.getSchemaType());
+ ASSERT_EQ(consumerConf.getSchema().getSchema(), schema.getSchema());
+ ASSERT_EQ(consumerConf.getSchema().getProperties(), schema.getProperties());
+
+ ASSERT_EQ(consumerConf.getConsumerType(), ConsumerExclusive);
+ ASSERT_TRUE(consumerConf.hasMessageListener());
+ ASSERT_EQ(consumerConf.getReceiverQueueSize(), 2000);
+ ASSERT_EQ(consumerConf.getConsumerName(), "my-reader");
+ ASSERT_EQ(consumerConf.isReadCompacted(), true);
+ ASSERT_EQ(consumerConf.getUnAckedMessagesTimeoutMs(), 11000);
+ ASSERT_EQ(consumerConf.getTickDurationInMs(), 2000);
+ ASSERT_EQ(consumerConf.getAckGroupingTimeMs(), 0);
+ ASSERT_EQ(consumerConf.getAckGroupingMaxSize(), 4096);
+ ASSERT_EQ(consumerConf.getCryptoKeyReader(), cryptoReader);
+ ASSERT_EQ(consumerConf.getCryptoFailureAction(), ConsumerCryptoFailureAction::DISCARD);
+ ASSERT_EQ(consumerConf.getProperties(), properties);
+ ASSERT_TRUE(consumerConf.hasProperty("key-1"));
+ ASSERT_EQ(consumerConf.getProperty("key-1"), "value-1");
+ ASSERT_TRUE(consumerConf.hasProperty("key-2"));
+ ASSERT_EQ(consumerConf.getProperty("key-2"), "value-2");
+ ASSERT_FALSE(consumerConf.hasProperty("key-3"));
+
+ client.close();
+}