You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/09 17:27:42 UTC

[pulsar] branch master updated: [C++] Add unit tests for key shared consumer (#7487)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 33edec3  [C++] Add unit tests for key shared consumer (#7487)
33edec3 is described below

commit 33edec3c244c69ff83a048861e421a0baf58a718
Author: Yunze Xu <xy...@gmail.com>
AuthorDate: Fri Jul 10 01:27:27 2020 +0800

    [C++] Add unit tests for key shared consumer (#7487)
    
    * Add unit tests for key shared consumer
    
    * Add test for priorities of ordering key and partitioned key
    
    * Remove unused headers and variables
---
 pulsar-client-cpp/tests/KeySharedConsumerTest.cc | 192 +++++++++++++++++++++++
 pulsar-client-cpp/tests/LogHelper.h              |  39 +++++
 2 files changed, 231 insertions(+)

diff --git a/pulsar-client-cpp/tests/KeySharedConsumerTest.cc b/pulsar-client-cpp/tests/KeySharedConsumerTest.cc
new file mode 100644
index 0000000..01e799f
--- /dev/null
+++ b/pulsar-client-cpp/tests/KeySharedConsumerTest.cc
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <cmath>
+#include <ctime>
+#include <vector>
+#include <map>
+
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include "lib/LogUtils.h"
+
+#include "HttpHelper.h"
+#include "LogHelper.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+class KeySharedConsumerTest : public ::testing::Test {
+   protected:
+    KeySharedConsumerTest() : client(lookupUrl, ClientConfiguration().setPartititionsUpdateInterval(1)) {}
+
+    void TearDown() override { client.close(); }
+
+    void addProducer(const std::string& topicName) {
+        producers.emplace_back();
+        // NOTE: Current C++ producer doesn't support key based batch, so we need to disable batching
+        auto conf = ProducerConfiguration().setBatchingEnabled(false).setPartitionsRoutingMode(
+            ProducerConfiguration::RoundRobinDistribution);
+        ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producers.back()));
+    }
+
+    ConsumerConfiguration getConsumerConfiguration() {
+        ConsumerConfiguration conf;
+        conf.setConsumerType(ConsumerKeyShared);
+        conf.setPatternAutoDiscoveryPeriod(1);  // find new topics quickly
+        return conf;
+    }
+
+    void addConsumer(const std::string& topicName) {
+        consumers.emplace_back();
+        ASSERT_EQ(ResultOk,
+                  client.subscribe(topicName, subName, getConsumerConfiguration(), consumers.back()));
+    }
+
+    void addRegexConsumer(const std::string& pattern) {
+        consumers.emplace_back();
+        ASSERT_EQ(ResultOk,
+                  client.subscribeWithRegex(pattern, subName, getConsumerConfiguration(), consumers.back()));
+    }
+
+    static constexpr int NUMBER_OF_KEYS = 300;
+
+    static Message newIntMessage(int i, const std::string& key, const char* orderingKey = nullptr) {
+        MessageBuilder builder;
+        if (orderingKey) {
+            builder.setOrderingKey(orderingKey);
+        }
+        return builder.setPartitionKey(key).setContent(std::to_string(i)).build();
+    }
+
+    static void sendCallback(Result result, const MessageId&) { ASSERT_EQ(result, ResultOk); }
+
+    void receiveAndCheckDistribution() {
+        // key is message's ordering key or partitioned key, value is consumer index
+        std::map<std::string, size_t> keyToConsumer;
+        // key is consumer index, value is the number of message received by consumers[key]
+        std::map<size_t, int> messagesPerConsumer;
+
+        int totalMessages = 0;
+
+        for (size_t i = 0; i < consumers.size(); i++) {
+            auto& consumer = consumers[i];
+            int messagesForThisConsumer = 0;
+            while (true) {
+                Message msg;
+                Result result = consumer.receive(msg, 1000);
+                if (result == ResultTimeout) {
+                    messagesPerConsumer[i] = messagesForThisConsumer;
+                    break;
+                }
+
+                ASSERT_EQ(result, ResultOk);
+                totalMessages++;
+                messagesForThisConsumer++;
+                ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
+
+                if (msg.hasPartitionKey() || msg.hasOrderingKey()) {
+                    std::string key = msg.hasOrderingKey() ? msg.getOrderingKey() : msg.getPartitionKey();
+                    auto iter = keyToConsumer.find(key);
+                    if (iter == keyToConsumer.end()) {
+                        keyToConsumer[key] = i;
+                    } else {
+                        ASSERT_EQ(iter->second, i);
+                    }
+                }
+            }
+        }
+
+        const double expectedMessagesPerConsumer = static_cast<double>(totalMessages) / consumers.size();
+        constexpr double PERCENT_ERROR = 0.50;
+        LOG_INFO("messagesPerConsumer: " << messagesPerConsumer);
+        for (const auto& kv : messagesPerConsumer) {
+            int count = kv.second;
+            ASSERT_LT(fabs(count - expectedMessagesPerConsumer), expectedMessagesPerConsumer * PERCENT_ERROR);
+        }
+    }
+
+    Client client;
+    std::vector<Producer> producers;
+    std::vector<Consumer> consumers;
+    const std::string subName = "SubscriptionName";
+};
+
+TEST_F(KeySharedConsumerTest, testNonPartitionedTopic) {
+    const std::string topicName = "KeySharedConsumerTest-non-par-topic" + std::to_string(time(nullptr));
+
+    addProducer(topicName);
+    for (int i = 0; i < 3; i++) {
+        addConsumer(topicName);
+    }
+
+    srand(time(nullptr));
+    for (int i = 0; i < 1000; i++) {
+        std::string key = std::to_string(rand() % NUMBER_OF_KEYS);
+        producers[0].sendAsync(newIntMessage(i, key), sendCallback);
+    }
+    ASSERT_EQ(ResultOk, producers[0].flush());
+
+    receiveAndCheckDistribution();
+}
+
+TEST_F(KeySharedConsumerTest, testMultiTopics) {
+    const std::string topicNamePrefix = "KeySharedConsumerTest-multi-topics" + std::to_string(time(nullptr));
+
+    for (int i = 0; i < 3; i++) {
+        addProducer(topicNamePrefix + std::to_string(i));
+    }
+    for (int i = 0; i < 3; i++) {
+        addRegexConsumer(".*" + topicNamePrefix + ".*");
+    }
+
+    srand(time(nullptr));
+    for (auto& producer : producers) {
+        for (int i = 0; i < 1000; i++) {
+            std::string key = std::to_string(rand() % NUMBER_OF_KEYS);
+            producer.sendAsync(newIntMessage(i, key), sendCallback);
+        }
+        ASSERT_EQ(ResultOk, producer.flush());
+    }
+
+    receiveAndCheckDistribution();
+}
+
+TEST_F(KeySharedConsumerTest, testOrderingKeyPriority) {
+    const std::string topicName =
+        "KeySharedConsumerTest-ordering-key-priority" + std::to_string(time(nullptr));
+
+    addProducer(topicName);
+    for (int i = 0; i < 3; i++) {
+        addConsumer(topicName);
+    }
+
+    srand(time(nullptr));
+    for (int i = 0; i < 1000; i++) {
+        int randomInt = rand();
+        std::string key = std::to_string(randomInt % NUMBER_OF_KEYS);
+        std::string orderingKey = std::to_string((randomInt + 1) % NUMBER_OF_KEYS);
+        producers[0].sendAsync(newIntMessage(i, key, orderingKey.c_str()), sendCallback);
+    }
+    ASSERT_EQ(ResultOk, producers[0].flush());
+
+    receiveAndCheckDistribution();
+}
diff --git a/pulsar-client-cpp/tests/LogHelper.h b/pulsar-client-cpp/tests/LogHelper.h
new file mode 100644
index 0000000..113aa67
--- /dev/null
+++ b/pulsar-client-cpp/tests/LogHelper.h
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <map>
+#include <sstream>
+#include <string>
+
+template <typename Key, typename Value>
+inline std::ostream& operator<<(std::ostream& os, const std::map<Key, Value>& m) {
+    os << "{";
+    bool first = true;
+    for (const auto& kv : m) {
+        if (!first) {
+            os << ", ";
+        } else {
+            first = false;
+        }
+        os << kv.first << " => " << kv.second;
+    }
+    os << "}";
+    return os;
+}