You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/06/10 19:54:36 UTC

[pulsar] branch master updated: [C++] Fix undefined behavior caused by uninitialized variables (#10892)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e77fa36  [C++] Fix undefined behavior caused by uninitialized variables (#10892)
e77fa36 is described below

commit e77fa36e5202d64084515922f7972446633cf5fd
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jun 11 03:54:07 2021 +0800

    [C++] Fix undefined behavior caused by uninitialized variables (#10892)
    
    * Use brace initializer to initialize configurations
    
    * Add config tests
---
 .../include/pulsar/ClientConfiguration.h           |   2 +
 .../include/pulsar/ConsumerConfiguration.h         |   6 +
 .../include/pulsar/ProducerConfiguration.h         |  17 ++-
 pulsar-client-cpp/include/pulsar/Schema.h          |  10 ++
 pulsar-client-cpp/lib/ClientConfigurationImpl.h    |  38 ++----
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |  51 +++-----
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |  43 ++-----
 .../tests/ConsumerConfigurationTest.cc             |  99 ++++++++++++++++
 pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h     |  36 ++++++
 .../tests/ProducerConfigurationTest.cc             | 132 +++++++++++++++++++++
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc |  14 +--
 11 files changed, 340 insertions(+), 108 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index 71717c2..11bfc43 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -217,6 +217,8 @@ class PULSAR_PUBLIC ClientConfiguration {
     /**
      * Initialize stats interval in seconds. Stats are printed and reset after every `statsIntervalInSeconds`.
      *
+     * Default: 600
+     *
      * Set to 0 means disabling stats collection.
      */
     ClientConfiguration& setStatsIntervalInSeconds(const unsigned int&);
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index d0bc5ee..bf7fdcd 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -187,6 +187,9 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
      * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
      * redelivered.
+     *
+     * Default: 0, which means the the tracker for unacknowledged messages is disabled.
+     *
      * @param timeout in milliseconds
      */
     void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds);
@@ -269,6 +272,9 @@ class PULSAR_PUBLIC ConsumerConfiguration {
 
     /**
      * Set the time duration for which the broker side consumer stats will be cached in the client.
+     *
+     * Default: 30000, which means 30 seconds.
+     *
      * @param cacheTimeInMs in milliseconds
      */
     void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 085d342..3306b27 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -140,6 +140,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
      * The first message uses (initialSequenceId + 1) as its sequence ID and subsequent messages are assigned
      * incremental sequence IDs.
      *
+     * Default: -1, which means the first message's sequence ID is 0.
+     *
      * @param initialSequenceId the initial sequence ID for the producer.
      * @return
      */
@@ -178,6 +180,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
      * would fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the
      * blocking behavior.
      *
+     * Default: 1000
+     *
      * @param maxPendingMessages max number of pending messages.
      * @return
      */
@@ -194,6 +198,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
      * This setting will be used to lower the max pending messages for each partition
      * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value.
      *
+     * Default: 50000
+     *
      * @param maxPendingMessagesAcrossPartitions
      */
     ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions);
@@ -206,6 +212,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
     /**
      * Set the message routing modes for partitioned topics.
      *
+     * Default: UseSinglePartition
+     *
      * @param PartitionsRoutingMode partition routing mode.
      * @return
      */
@@ -233,6 +241,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
      * Set the hashing scheme, which is a standard hashing function available when choosing the partition
      * used for a particular message.
      *
+     * Default: HashingScheme::BoostHash
+     *
      * <p>Standard hashing functions available are:
      * <ul>
      * <li>{@link HashingScheme::JavaStringHash}: Java {@code String.hashCode()} (Default).
@@ -266,8 +276,9 @@ class PULSAR_PUBLIC ProducerConfiguration {
     // Zero queue size feature will not be supported on consumer end if batching is enabled
 
     /**
-     * Control whether automatic batching of messages is enabled or not for the producer. <i>Default value:
-     * false (no automatic batching).</i>
+     * Control whether automatic batching of messages is enabled or not for the producer.
+     *
+     * Default: true
      *
      * When automatic batching is enabled, multiple calls to Producer::sendAsync can result in a single batch
      * to be sent to the broker, leading to better throughput, especially when publishing small messages. If
@@ -343,6 +354,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
     const unsigned long& getBatchingMaxPublishDelayMs() const;
 
     /**
+     * Default: DefaultBatching
+     *
      * @see BatchingType
      */
     ProducerConfiguration& setBatchingType(BatchingType batchingType);
diff --git a/pulsar-client-cpp/include/pulsar/Schema.h b/pulsar-client-cpp/include/pulsar/Schema.h
index b14851e..d8b47f5 100644
--- a/pulsar-client-cpp/include/pulsar/Schema.h
+++ b/pulsar-client-cpp/include/pulsar/Schema.h
@@ -117,6 +117,16 @@ typedef std::map<std::string, std::string> StringMap;
  */
 class PULSAR_PUBLIC SchemaInfo {
    public:
+    /**
+     * The default constructor with following configs:
+     * - schemaType: SchemaType::BYTES
+     * - name: "BYTES"
+     * - schema: ""
+     * - properties: {}
+     *
+     * @see SchemaInfo(SchemaType schemaType, const std::string& name, const std::string& schema, const
+     * StringMap& properties)
+     */
     SchemaInfo();
 
     /**
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 9d36672..631e8ae 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -24,38 +24,22 @@
 namespace pulsar {
 
 struct ClientConfigurationImpl {
-    AuthenticationPtr authenticationPtr;
-    uint64_t memoryLimit;
-    int ioThreads;
-    int operationTimeoutSeconds;
-    int messageListenerThreads;
-    int concurrentLookupRequest;
+    AuthenticationPtr authenticationPtr{AuthFactory::Disabled()};
+    uint64_t memoryLimit{0ull};
+    int ioThreads{1};
+    int operationTimeoutSeconds{30};
+    int messageListenerThreads{1};
+    int concurrentLookupRequest{50000};
     std::string logConfFilePath;
-    bool useTls;
+    bool useTls{false};
     std::string tlsTrustCertsFilePath;
-    bool tlsAllowInsecureConnection;
-    unsigned int statsIntervalInSeconds;
+    bool tlsAllowInsecureConnection{false};
+    unsigned int statsIntervalInSeconds{600};  // 10 minutes
     std::unique_ptr<LoggerFactory> loggerFactory;
-    bool validateHostName;
-    unsigned int partitionsUpdateInterval;
+    bool validateHostName{false};
+    unsigned int partitionsUpdateInterval{60};  // 1 minute
     std::string listenerName;
 
-    ClientConfigurationImpl()
-        : authenticationPtr(AuthFactory::Disabled()),
-          memoryLimit(0ull),
-          ioThreads(1),
-          operationTimeoutSeconds(30),
-          messageListenerThreads(1),
-          concurrentLookupRequest(50000),
-          logConfFilePath(),
-          useTls(false),
-          tlsAllowInsecureConnection(false),
-          statsIntervalInSeconds(600),  // 10 minutes
-          loggerFactory(),
-          validateHostName(false),
-          partitionsUpdateInterval(60)  // 1 minute
-    {}
-
     std::unique_ptr<LoggerFactory> takeLogger() { return std::move(loggerFactory); }
 };
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 24d6e65..502c201 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -26,48 +26,27 @@
 namespace pulsar {
 struct ConsumerConfigurationImpl {
     SchemaInfo schemaInfo;
-    long unAckedMessagesTimeoutMs;
-    long tickDurationInMs;
+    long unAckedMessagesTimeoutMs{0};
+    long tickDurationInMs{1000};
 
-    long negativeAckRedeliveryDelayMs;
-    long ackGroupingTimeMs;
-    long ackGroupingMaxSize;
-    ConsumerType consumerType;
+    long negativeAckRedeliveryDelayMs{60000};
+    long ackGroupingTimeMs{100};
+    long ackGroupingMaxSize{1000};
+    ConsumerType consumerType{ConsumerExclusive};
     MessageListener messageListener;
-    bool hasMessageListener;
-    int receiverQueueSize;
-    int maxTotalReceiverQueueSizeAcrossPartitions;
+    bool hasMessageListener{false};
+    int receiverQueueSize{1000};
+    int maxTotalReceiverQueueSizeAcrossPartitions{50000};
     std::string consumerName;
-    long brokerConsumerStatsCacheTimeInMs;
+    long brokerConsumerStatsCacheTimeInMs{30 * 1000L};  // 30 seconds
     CryptoKeyReaderPtr cryptoKeyReader;
-    ConsumerCryptoFailureAction cryptoFailureAction;
-    bool readCompacted;
-    InitialPosition subscriptionInitialPosition;
-    int patternAutoDiscoveryPeriod;
-    bool replicateSubscriptionStateEnabled;
+    ConsumerCryptoFailureAction cryptoFailureAction{ConsumerCryptoFailureAction::FAIL};
+    bool readCompacted{false};
+    InitialPosition subscriptionInitialPosition{InitialPosition::InitialPositionLatest};
+    int patternAutoDiscoveryPeriod{60};
+    bool replicateSubscriptionStateEnabled{false};
     std::map<std::string, std::string> properties;
     KeySharedPolicy keySharedPolicy;
-
-    ConsumerConfigurationImpl()
-        : schemaInfo(),
-          unAckedMessagesTimeoutMs(0),
-          tickDurationInMs(1000),
-          negativeAckRedeliveryDelayMs(60000),
-          ackGroupingTimeMs(100),
-          ackGroupingMaxSize(1000),
-          consumerType(ConsumerExclusive),
-          messageListener(),
-          hasMessageListener(false),
-          brokerConsumerStatsCacheTimeInMs(30 * 1000),  // 30 seconds
-          receiverQueueSize(1000),
-          maxTotalReceiverQueueSizeAcrossPartitions(50000),
-          cryptoKeyReader(),
-          cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
-          readCompacted(false),
-          subscriptionInitialPosition(InitialPosition::InitialPositionLatest),
-          patternAutoDiscoveryPeriod(60),
-          properties(),
-          keySharedPolicy() {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 7c30793..fa6b755 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -30,40 +30,23 @@ struct ProducerConfigurationImpl {
     SchemaInfo schemaInfo;
     Optional<std::string> producerName;
     Optional<int64_t> initialSequenceId;
-    int sendTimeoutMs;
-    CompressionType compressionType;
-    int maxPendingMessages;
-    int maxPendingMessagesAcrossPartitions;
-    ProducerConfiguration::PartitionsRoutingMode routingMode;
+    int sendTimeoutMs{30000};
+    CompressionType compressionType{CompressionNone};
+    int maxPendingMessages{1000};
+    int maxPendingMessagesAcrossPartitions{50000};
+    ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
     MessageRoutingPolicyPtr messageRouter;
-    ProducerConfiguration::HashingScheme hashingScheme;
-    bool blockIfQueueFull;
-    bool batchingEnabled;
-    unsigned int batchingMaxMessages;
-    unsigned long batchingMaxAllowedSizeInBytes;
-    unsigned long batchingMaxPublishDelayMs;
-    ProducerConfiguration::BatchingType batchingType;
+    ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
+    bool blockIfQueueFull{false};
+    bool batchingEnabled{true};
+    unsigned int batchingMaxMessages{1000};
+    unsigned long batchingMaxAllowedSizeInBytes{128 * 1024};  // 128 KB
+    unsigned long batchingMaxPublishDelayMs{10};              // 10 milli seconds
+    ProducerConfiguration::BatchingType batchingType{ProducerConfiguration::DefaultBatching};
     CryptoKeyReaderPtr cryptoKeyReader;
     std::set<std::string> encryptionKeys;
-    ProducerCryptoFailureAction cryptoFailureAction;
+    ProducerCryptoFailureAction cryptoFailureAction{ProducerCryptoFailureAction::FAIL};
     std::map<std::string, std::string> properties;
-    ProducerConfigurationImpl()
-        : schemaInfo(),
-          sendTimeoutMs(30000),
-          compressionType(CompressionNone),
-          maxPendingMessages(1000),
-          maxPendingMessagesAcrossPartitions(50000),
-          routingMode(ProducerConfiguration::UseSinglePartition),
-          hashingScheme(ProducerConfiguration::BoostHash),
-          blockIfQueueFull(false),
-          batchingEnabled(true),
-          batchingMaxMessages(1000),
-          batchingMaxAllowedSizeInBytes(128 * 1024),  // 128 KB
-          batchingMaxPublishDelayMs(10),              // 10 milli seconds
-          batchingType(ProducerConfiguration::DefaultBatching),
-          cryptoKeyReader(),
-          encryptionKeys(),
-          cryptoFailureAction(ProducerCryptoFailureAction::FAIL) {}
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
index 379bcdc..3ab23f7 100644
--- a/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerConfigurationTest.cc
@@ -19,6 +19,7 @@
 #include <pulsar/Client.h>
 #include <gtest/gtest.h>
 #include <lib/LogUtils.h>
+#include "NoOpsCryptoKeyReader.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -27,6 +28,104 @@ DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
+TEST(ConsumerConfigurationTest, testDefaultConfig) {
+    ConsumerConfiguration conf;
+    ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
+    ASSERT_EQ(conf.getConsumerType(), ConsumerExclusive);
+    ASSERT_EQ(conf.hasMessageListener(), false);
+    ASSERT_EQ(conf.getReceiverQueueSize(), 1000);
+    ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 50000);
+    ASSERT_EQ(conf.getConsumerName(), "");
+    ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 0);
+    ASSERT_EQ(conf.getTickDurationInMs(), 1000);
+    ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 60000);
+    ASSERT_EQ(conf.getAckGroupingTimeMs(), 100);
+    ASSERT_EQ(conf.getAckGroupingMaxSize(), 1000);
+    ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 30000);
+    ASSERT_EQ(conf.isReadCompacted(), false);
+    ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 60);
+    ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionLatest);
+    ASSERT_EQ(conf.getCryptoKeyReader(), CryptoKeyReaderPtr{});
+    ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::FAIL);
+    ASSERT_EQ(conf.isEncryptionEnabled(), false);
+    ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), false);
+    ASSERT_EQ(conf.getProperties().empty(), true);
+}
+
+TEST(ConsumerConfigurationTest, testCustomConfig) {
+    ConsumerConfiguration conf;
+
+    const std::string exampleSchema =
+        "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+        "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+    const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+    conf.setSchema(schema);
+    ASSERT_EQ(conf.getSchema().getName(), schema.getName());
+    ASSERT_EQ(conf.getSchema().getSchemaType(), schema.getSchemaType());
+    ASSERT_EQ(conf.getSchema().getSchema(), schema.getSchema());
+    ASSERT_EQ(conf.getSchema().getProperties(), schema.getProperties());
+
+    conf.setConsumerType(ConsumerKeyShared);
+    ASSERT_EQ(conf.getConsumerType(), ConsumerKeyShared);
+
+    conf.setMessageListener([](Consumer consumer, const Message& msg) {});
+    ASSERT_EQ(conf.hasMessageListener(), true);
+
+    conf.setReceiverQueueSize(2000);
+    ASSERT_EQ(conf.getReceiverQueueSize(), 2000);
+
+    conf.setMaxTotalReceiverQueueSizeAcrossPartitions(100000);
+    ASSERT_EQ(conf.getMaxTotalReceiverQueueSizeAcrossPartitions(), 100000);
+
+    conf.setConsumerName("consumer");
+    ASSERT_EQ(conf.getConsumerName(), "consumer");
+
+    conf.setUnAckedMessagesTimeoutMs(20000);
+    ASSERT_EQ(conf.getUnAckedMessagesTimeoutMs(), 20000);
+
+    conf.setTickDurationInMs(2000);
+    ASSERT_EQ(conf.getTickDurationInMs(), 2000);
+
+    conf.setNegativeAckRedeliveryDelayMs(10000);
+    ASSERT_EQ(conf.getNegativeAckRedeliveryDelayMs(), 10000);
+
+    conf.setAckGroupingTimeMs(200);
+    ASSERT_EQ(conf.getAckGroupingTimeMs(), 200);
+
+    conf.setAckGroupingMaxSize(2000);
+    ASSERT_EQ(conf.getAckGroupingMaxSize(), 2000);
+
+    conf.setBrokerConsumerStatsCacheTimeInMs(60000);
+    ASSERT_EQ(conf.getBrokerConsumerStatsCacheTimeInMs(), 60000);
+
+    conf.setReadCompacted(true);
+    ASSERT_EQ(conf.isReadCompacted(), true);
+
+    conf.setPatternAutoDiscoveryPeriod(120);
+    ASSERT_EQ(conf.getPatternAutoDiscoveryPeriod(), 120);
+
+    conf.setSubscriptionInitialPosition(InitialPositionEarliest);
+    ASSERT_EQ(conf.getSubscriptionInitialPosition(), InitialPositionEarliest);
+
+    const auto cryptoKeyReader = std::make_shared<NoOpsCryptoKeyReader>();
+    conf.setCryptoKeyReader(cryptoKeyReader);
+    ASSERT_EQ(conf.getCryptoKeyReader(), cryptoKeyReader);
+    // NOTE: once CryptoKeyReader was set, the isEncryptionEnabled() would return true, it's different from
+    // ProducerConfiguration
+    ASSERT_EQ(conf.isEncryptionEnabled(), true);
+
+    conf.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+    ASSERT_EQ(conf.getCryptoFailureAction(), ConsumerCryptoFailureAction::CONSUME);
+
+    conf.setReplicateSubscriptionStateEnabled(true);
+    ASSERT_EQ(conf.isReplicateSubscriptionStateEnabled(), true);
+
+    conf.setProperty("k1", "v1");
+    ASSERT_EQ(conf.getProperties()["k1"], "v1");
+    ASSERT_EQ(conf.hasProperty("k1"), true);
+}
+
 TEST(ConsumerConfigurationTest, testReadCompactPersistentExclusive) {
     std::string lookupUrl = "pulsar://localhost:6650";
     std::string topicName = "persist-topic";
diff --git a/pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h b/pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h
new file mode 100644
index 0000000..e152690
--- /dev/null
+++ b/pulsar-client-cpp/tests/NoOpsCryptoKeyReader.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/CryptoKeyReader.h>
+
+namespace pulsar {
+
+class NoOpsCryptoKeyReader : public CryptoKeyReader {
+   public:
+    Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                        EncryptionKeyInfo& encKeyInfo) const override {
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
+                         EncryptionKeyInfo& encKeyInfo) const override {
+        return ResultOk;
+    }
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/ProducerConfigurationTest.cc b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
new file mode 100644
index 0000000..b88f6e4
--- /dev/null
+++ b/pulsar-client-cpp/tests/ProducerConfigurationTest.cc
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/ProducerConfiguration.h>
+#include "NoOpsCryptoKeyReader.h"
+
+using namespace pulsar;
+
+TEST(ProducerConfigurationTest, testDefaultConfig) {
+    ProducerConfiguration conf;
+    ASSERT_EQ(conf.getProducerName(), "");
+    ASSERT_EQ(conf.getSchema().getName(), "BYTES");
+    ASSERT_EQ(conf.getSchema().getSchemaType(), SchemaType::BYTES);
+    ASSERT_EQ(conf.getSendTimeout(), 30000);
+    ASSERT_EQ(conf.getInitialSequenceId(), -1ll);
+    ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionNone);
+    ASSERT_EQ(conf.getMaxPendingMessages(), 1000);
+    ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 50000);
+    ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::UseSinglePartition);
+    ASSERT_EQ(conf.getMessageRouterPtr(), MessageRoutingPolicyPtr{});
+    ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::BoostHash);
+    ASSERT_EQ(conf.getBlockIfQueueFull(), false);
+    ASSERT_EQ(conf.getBatchingEnabled(), true);
+    ASSERT_EQ(conf.getBatchingMaxMessages(), 1000);
+    ASSERT_EQ(conf.getBatchingMaxAllowedSizeInBytes(), 128 * 1024);
+    ASSERT_EQ(conf.getBatchingMaxPublishDelayMs(), 10);
+    ASSERT_EQ(conf.getBatchingType(), ProducerConfiguration::DefaultBatching);
+    ASSERT_EQ(conf.getCryptoKeyReader(), CryptoKeyReaderPtr{});
+    ASSERT_EQ(conf.getCryptoFailureAction(), ProducerCryptoFailureAction::FAIL);
+    ASSERT_EQ(conf.isEncryptionEnabled(), false);
+    ASSERT_EQ(conf.getEncryptionKeys(), std::set<std::string>{});
+    ASSERT_EQ(conf.getProperties().empty(), true);
+}
+
+class MockMessageRoutingPolicy : public MessageRoutingPolicy {
+   public:
+    int getPartition(const Message& msg) override { return 0; }
+    int getPartition(const Message& msg, const TopicMetadata& topicMetadata) override { return 0; }
+};
+
+TEST(ProducerConfigurationTest, testCustomConfig) {
+    ProducerConfiguration conf;
+
+    conf.setProducerName("producer");
+    ASSERT_EQ(conf.getProducerName(), "producer");
+
+    const std::string exampleSchema =
+        "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+        "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+    const SchemaInfo schema(AVRO, "Avro", exampleSchema, StringMap{{"schema-key", "schema-value"}});
+
+    conf.setSchema(schema);
+    ASSERT_EQ(conf.getSchema().getName(), schema.getName());
+    ASSERT_EQ(conf.getSchema().getSchemaType(), schema.getSchemaType());
+    ASSERT_EQ(conf.getSchema().getSchema(), schema.getSchema());
+    ASSERT_EQ(conf.getSchema().getProperties(), schema.getProperties());
+
+    conf.setSendTimeout(0);
+    ASSERT_EQ(conf.getSendTimeout(), 0);
+
+    conf.setInitialSequenceId(100ll);
+    ASSERT_EQ(conf.getInitialSequenceId(), 100ll);
+
+    conf.setCompressionType(CompressionType::CompressionLZ4);
+    ASSERT_EQ(conf.getCompressionType(), CompressionType::CompressionLZ4);
+
+    conf.setMaxPendingMessages(2000);
+    ASSERT_EQ(conf.getMaxPendingMessages(), 2000);
+
+    conf.setMaxPendingMessagesAcrossPartitions(100000);
+    ASSERT_EQ(conf.getMaxPendingMessagesAcrossPartitions(), 100000);
+
+    conf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::RoundRobinDistribution);
+
+    const auto router = std::make_shared<MockMessageRoutingPolicy>();
+    conf.setMessageRouter(router);
+    ASSERT_EQ(conf.getPartitionsRoutingMode(), ProducerConfiguration::CustomPartition);
+    ASSERT_EQ(conf.getMessageRouterPtr(), router);
+
+    conf.setHashingScheme(ProducerConfiguration::JavaStringHash);
+    ASSERT_EQ(conf.getHashingScheme(), ProducerConfiguration::JavaStringHash);
+
+    conf.setBlockIfQueueFull(true);
+    ASSERT_EQ(conf.getBlockIfQueueFull(), true);
+
+    conf.setBatchingEnabled(false);
+    ASSERT_EQ(conf.getBatchingEnabled(), false);
+
+    conf.setBatchingMaxMessages(2000);
+    ASSERT_EQ(conf.getBatchingMaxMessages(), 2000);
+
+    conf.setBatchingMaxAllowedSizeInBytes(1024);
+    ASSERT_EQ(conf.getBatchingMaxAllowedSizeInBytes(), 1024);
+
+    conf.setBatchingMaxPublishDelayMs(1);
+    ASSERT_EQ(conf.getBatchingMaxPublishDelayMs(), 1);
+
+    conf.setBatchingType(ProducerConfiguration::KeyBasedBatching);
+    ASSERT_EQ(conf.getBatchingType(), ProducerConfiguration::KeyBasedBatching);
+
+    const auto cryptoKeyReader = std::make_shared<NoOpsCryptoKeyReader>();
+    conf.setCryptoKeyReader(cryptoKeyReader);
+    ASSERT_EQ(conf.getCryptoKeyReader(), cryptoKeyReader);
+
+    conf.setCryptoFailureAction(pulsar::ProducerCryptoFailureAction::SEND);
+    ASSERT_EQ(conf.getCryptoFailureAction(), ProducerCryptoFailureAction::SEND);
+
+    conf.addEncryptionKey("key");
+    ASSERT_EQ(conf.getEncryptionKeys(), std::set<std::string>{"key"});
+    ASSERT_EQ(conf.isEncryptionEnabled(), true);
+
+    conf.setProperty("k1", "v1");
+    ASSERT_EQ(conf.getProperties()["k1"], "v1");
+    ASSERT_EQ(conf.hasProperty("k1"), true);
+}
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
index 6af4b4a..ccbfa2d 100644
--- a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -24,6 +24,7 @@
 #include <pulsar/Client.h>
 #include <lib/LogUtils.h>
 #include <lib/ReaderImpl.h>
+#include "NoOpsCryptoKeyReader.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -31,19 +32,6 @@ using namespace pulsar;
 
 static const std::string lookupUrl = "pulsar://localhost:6650";
 
-class NoOpsCryptoKeyReader : public CryptoKeyReader {
-   public:
-    Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
-                        EncryptionKeyInfo& encKeyInfo) const override {
-        return ResultOk;
-    }
-
-    Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
-                         EncryptionKeyInfo& encKeyInfo) const override {
-        return ResultOk;
-    }
-};
-
 TEST(ReaderConfigurationTest, testDefaultConfig) {
     const std::string topic = "ReaderConfigurationTest-default-config";
     Client client(lookupUrl);