You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/19 00:39:44 UTC

[GitHub] merlimat closed pull request #1033: Implement TopicMetadata for MessageRouter in C++ client

merlimat closed pull request #1033: Implement TopicMetadata for MessageRouter in C++ client
URL: https://github.com/apache/incubator-pulsar/pull/1033
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile
index 9d6de7d2d..89fb3fcae 100644
--- a/build/docker/Dockerfile
+++ b/build/docker/Dockerfile
@@ -25,13 +25,16 @@ ADD protobuf.patch /pulsar
 
 RUN apt-get update
 RUN apt-get install -y maven tig g++ cmake libssl-dev libcurl4-openssl-dev \
-                liblog4cxx-dev libprotobuf-dev libboost-all-dev  libgtest-dev \
+                liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \
                 libjsoncpp-dev libxml2-utils protobuf-compiler wget \
                 curl doxygen openjdk-8-jdk-headless
 
 # Compile and install gtest
 RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib
 
+# Compile and install google-mock
+RUN cd /usr/src/gmock && cmake . && make && cp libgmock.a /usr/lib
+
 # Include gtest parallel to speed up unit tests
 RUN git clone https://github.com/google/gtest-parallel.git
 
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 61281ee81..9a7fbad10 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -121,6 +121,7 @@ find_package(Boost REQUIRED COMPONENTS program_options filesystem regex
 
 find_package(OpenSSL REQUIRED)
 find_path(GTEST_INCLUDE_PATH gtest/gtest.h)
+find_path(GMOCK_INCLUDE_PATH gmock/gmock.h)
 find_path(JSON_INCLUDE_PATH jsoncpp)
 find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h)
 
@@ -146,6 +147,7 @@ include_directories(
   ${PROTOBUF_INCLUDE_DIR}
   ${LOG4CXX_INCLUDE_PATH}
   ${GTEST_INCLUDE_PATH}
+  ${GMOCK_INCLUDE_PATH}
   ${JSON_INCLUDE_PATH}
 )
 
diff --git a/pulsar-client-cpp/include/pulsar/DeprecatedException.h b/pulsar-client-cpp/include/pulsar/DeprecatedException.h
new file mode 100644
index 000000000..123edaa5f
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/DeprecatedException.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef DEPRECATED_EXCEPTION_HPP_
+#define DEPRECATED_EXCEPTION_HPP_
+
+#include <stdexcept>
+#include <string>
+
+namespace pulsar {
+    class DeprecatedException: public std::runtime_error {
+    public:
+        explicit DeprecatedException(const std::string& __arg);
+
+    private:
+        static const std::string message_prefix;
+    };
+}
+
+#endif //DEPRECATED_EXCEPTION_HPP_
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index 14042554b..9c134242c 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -39,6 +39,7 @@ class MessageBuilder;
 class MessageImpl;
 class PulsarWrapper;
 
+// TODO: When releasing 2.0.0, make all methods virtual and create the virtual destructor for Google Mock tests
 class Message {
  public:
     typedef std::map<std::string, std::string> StringMap;
diff --git a/pulsar-client-cpp/include/pulsar/MessageRoutingPolicy.h b/pulsar-client-cpp/include/pulsar/MessageRoutingPolicy.h
index ad6ca3700..3b72e86be 100644
--- a/pulsar-client-cpp/include/pulsar/MessageRoutingPolicy.h
+++ b/pulsar-client-cpp/include/pulsar/MessageRoutingPolicy.h
@@ -18,7 +18,10 @@
  */
 #ifndef PULSAR_MESSAGE_ROUTING_POLICY_HEADER_
 #define PULSAR_MESSAGE_ROUTING_POLICY_HEADER_
-#include "Message.h"
+
+#include <pulsar/DeprecatedException.h>
+#include <pulsar/Message.h>
+#include <pulsar/TopicMetadata.h>
 #include <boost/shared_ptr.hpp>
 
 #pragma GCC visibility push(default)
@@ -33,7 +36,17 @@ class MessageRoutingPolicy {
  public:
     virtual ~MessageRoutingPolicy() {}
 
-    virtual int getPartition(const Message& msg) = 0;
+    /** @deprecated
+       Use int getPartition(const Message& msg, const TopicMetadata& topicMetadata)
+    */
+    virtual int getPartition(const Message& msg) {
+        throw DeprecatedException("Use int getPartition(const Message& msg,"
+                                          " const TopicMetadata& topicMetadata)");
+    }
+
+    virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
+        return getPartition(msg);
+    }
 };
 
 typedef boost::shared_ptr<MessageRoutingPolicy> MessageRoutingPolicyPtr;
diff --git a/pulsar-client-cpp/include/pulsar/TopicMetadata.h b/pulsar-client-cpp/include/pulsar/TopicMetadata.h
new file mode 100644
index 000000000..d541ca9ec
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/TopicMetadata.h
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef TOPIC_METADATA_HPP_
+#define TOPIC_METADATA_HPP_
+
+namespace pulsar {
+/**
+ * Metadata of a topic that can be used for message routing.
+ */
+class TopicMetadata {
+public:
+    virtual int getNumPartitions() const = 0;
+};
+}
+
+#endif /* TOPIC_METADATA_HPP_ */
diff --git a/pulsar-client-cpp/lib/DeprecatedException.cc b/pulsar-client-cpp/lib/DeprecatedException.cc
new file mode 100644
index 000000000..8b788bf27
--- /dev/null
+++ b/pulsar-client-cpp/lib/DeprecatedException.cc
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/DeprecatedException.h>
+
+namespace pulsar {
+    const std::string DeprecatedException::message_prefix = "Deprecated: ";
+
+    DeprecatedException::DeprecatedException(const std::string& __arg)
+            : std::runtime_error(message_prefix + __arg) {
+
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 18e53311a..ec919fb8b 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#include <cstdlib>
 #include "PartitionedProducerImpl.h"
 #include "LogUtils.h"
 #include <boost/bind.hpp>
 #include <sstream>
 #include "RoundRobinMessageRouter.h"
 #include "SinglePartitionMessageRouter.h"
+#include "TopicMetadataImpl.h"
 #include "DestinationName.h"
 #include "MessageImpl.h"
 
@@ -37,18 +39,25 @@ namespace pulsar {
                                                      const ProducerConfiguration& config):client_(client),
                                                                                           destinationName_(destinationName),
                                                                                           topic_(destinationName_->toString()),
-                                                                                          numPartitions_(numPartitions),
                                                                                           conf_(config),
-                                                                                          state_(Pending)
+                                                                                          state_(Pending),
+                                                                                          topicMetadata_(new TopicMetadataImpl(numPartitions))
     {
         numProducersCreated_ = 0;
         cleanup_ = false;
-        if(config.getPartitionsRoutingMode() == ProducerConfiguration::RoundRobinDistribution) {
-            routerPolicy_ = boost::make_shared<RoundRobinMessageRouter>(numPartitions);
-        } else if (config.getPartitionsRoutingMode() == ProducerConfiguration::UseSinglePartition) {
-            routerPolicy_ = boost::make_shared<SinglePartitionMessageRouter>(numPartitions);
-        } else {
-            routerPolicy_ = config.getMessageRouterPtr();
+        routerPolicy_ = getMessageRouter();
+    }
+
+    MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
+        switch (conf_.getPartitionsRoutingMode()) {
+            case ProducerConfiguration::RoundRobinDistribution:
+                return boost::make_shared<RoundRobinMessageRouter>();
+            case ProducerConfiguration::CustomPartition:
+                return conf_.getMessageRouterPtr();
+            case ProducerConfiguration::UseSinglePartition:
+            default:
+                unsigned int random = rand();
+                return boost::make_shared<SinglePartitionMessageRouter>(random % topicMetadata_->getNumPartitions());
         }
     }
 
@@ -63,7 +72,7 @@ namespace pulsar {
     void PartitionedProducerImpl::start() {
         boost::shared_ptr<ProducerImpl> producer;
         // create producer per partition
-        for (unsigned int i = 0; i < numPartitions_; i++) {
+        for (unsigned int i = 0; i < topicMetadata_->getNumPartitions(); i++) {
             std::string topicPartitionName = destinationName_->getTopicPartitionName(i);
             producer = boost::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
             producer->getProducerCreatedFuture().addListener(boost::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
@@ -89,7 +98,7 @@ namespace pulsar {
             // Ignore, we have already informed client that producer creation failed
             return;
         }
-        assert(numProducersCreated_ <= numPartitions_);
+        assert(numProducersCreated_ <= topicMetadata_->getNumPartitions());
         if (result != ResultOk) {
             state_ = Failed;
             lock.unlock();
@@ -99,9 +108,9 @@ namespace pulsar {
             return;
         }
 
-        assert(partitionIndex <= numPartitions_);
+        assert(partitionIndex <= topicMetadata_->getNumPartitions());
         numProducersCreated_++;
-        if(numProducersCreated_ == numPartitions_) {
+        if(numProducersCreated_ == topicMetadata_->getNumPartitions()) {
             lock.unlock();
             partitionedProducerCreatedPromise_.setValue(shared_from_this());
         }
@@ -110,8 +119,8 @@ namespace pulsar {
     //override
     void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
         //get partition for this message from router policy
-        short partition = (short)(routerPolicy_->getPartition(msg));
-        if (partition >= numPartitions_ || partition >= producers_.size()) {
+        short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
+        if (partition >= topicMetadata_->getNumPartitions() || partition >= producers_.size()) {
             LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
             //change me: abort or notify failure in callback?
             //          change to appropriate error if callback
@@ -196,7 +205,7 @@ namespace pulsar {
             }
             return;
         }
-        assert (partitionIndex < numPartitions_);
+        assert (partitionIndex < topicMetadata_->getNumPartitions());
         if(numProducersCreated_ > 0) {
             numProducersCreated_--;
         }
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index f6c7f4a0f..8bc453c2c 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -21,8 +21,10 @@
 #include "DestinationName.h"
 #include <vector>
 #include <boost/shared_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include <pulsar/MessageRoutingPolicy.h>
+#include <pulsar/TopicMetadata.h>
 
 namespace pulsar {
 
@@ -88,7 +90,7 @@ namespace pulsar {
     const DestinationNamePtr destinationName_;
     const std::string topic_;
 
-    const unsigned int numPartitions_;
+    boost::scoped_ptr<TopicMetadata> topicMetadata_;
 
     unsigned int numProducersCreated_;
 
@@ -112,6 +114,8 @@ namespace pulsar {
 
     // only set this promise to value, when producers on all partitions are created.
     Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;
+
+      MessageRoutingPolicyPtr getMessageRouter();
   };
 
 } //ends namespace Pulsar
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
index 4a31c2b9d..c4dceffe8 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.cc
@@ -17,25 +17,24 @@
  * under the License.
  */
 #include "RoundRobinMessageRouter.h"
-#include <boost/algorithm/string.hpp>
 
 namespace pulsar {
-    RoundRobinMessageRouter::RoundRobinMessageRouter(unsigned int numPartitions):prevPartition_(0), numPartitions_(numPartitions) {
+    RoundRobinMessageRouter::RoundRobinMessageRouter():prevPartition_(0) {
     }
 
     RoundRobinMessageRouter::~RoundRobinMessageRouter() {
     }
 
     //override
-    int RoundRobinMessageRouter::getPartition(const Message& msg) {
+    int RoundRobinMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
         //if message has a key, hash the key and return the partition
         if (msg.hasPartitionKey()) {
             static StringHash hash;
-            return hash(msg.getPartitionKey()) % numPartitions_;
+            return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
         } else {
             Lock lock(mutex_);
             //else pick the next partition
-            return prevPartition_++ % numPartitions_;
+            return prevPartition_++ % topicMetadata.getNumPartitions();
         }
     }
 
diff --git a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
index 996f20720..06c56e0f6 100644
--- a/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
+++ b/pulsar-client-cpp/lib/RoundRobinMessageRouter.h
@@ -20,19 +20,19 @@
 #define PULSAR_RR_MESSAGE_ROUTER_HEADER_
 
 #include <pulsar/MessageRoutingPolicy.h>
+#include <pulsar/TopicMetadata.h>
 #include <boost/functional/hash.hpp>
 #include <boost/thread/mutex.hpp>
 
 namespace pulsar {
     class RoundRobinMessageRouter : public MessageRoutingPolicy {
     public:
-        RoundRobinMessageRouter (unsigned int numPartitions);
+        RoundRobinMessageRouter ();
         virtual ~RoundRobinMessageRouter();
-        virtual int getPartition(const Message& msg);
+        virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata);
     private:
         boost::mutex mutex_;
         unsigned int prevPartition_;
-        unsigned int numPartitions_;
     };
     typedef boost::hash<std::string> StringHash;
     typedef boost::unique_lock<boost::mutex> Lock;
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
index 5529b2f97..a83b9f37d 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
+++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.cc
@@ -17,21 +17,19 @@
  * under the License.
  */
 #include "SinglePartitionMessageRouter.h"
-#include <cstdlib> // rand()
-#include <boost/algorithm/string.hpp>
+
 namespace pulsar {
     SinglePartitionMessageRouter::~SinglePartitionMessageRouter(){}
-    SinglePartitionMessageRouter::SinglePartitionMessageRouter(unsigned int numPartitions):numPartitions_(numPartitions) {
-        unsigned int random = rand();
-        selectedSinglePartition_ = random % numPartitions_;
+    SinglePartitionMessageRouter::SinglePartitionMessageRouter(const int partitionIndex) {
+        selectedSinglePartition_ = partitionIndex;
     }
 
     //override
-    int SinglePartitionMessageRouter::getPartition(const Message& msg) {
+    int SinglePartitionMessageRouter::getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
         //if message has a key, hash the key and return the partition
         if (msg.hasPartitionKey()) {
             StringHash hash;
-            return hash(msg.getPartitionKey()) % numPartitions_;
+            return hash(msg.getPartitionKey()) % topicMetadata.getNumPartitions();
         } else {
             //else pick the next partition
             return selectedSinglePartition_;
diff --git a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
index f38e81fb1..833eac69a 100644
--- a/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
+++ b/pulsar-client-cpp/lib/SinglePartitionMessageRouter.h
@@ -20,18 +20,18 @@
 #define PULSAR_SINGLE_PARTITION_MESSAGE_ROUTER_HEADER_
 
 #include <pulsar/MessageRoutingPolicy.h>
+#include <pulsar/TopicMetadata.h>
 #include <boost/functional/hash.hpp>
 
 namespace pulsar {
 
     class SinglePartitionMessageRouter : public MessageRoutingPolicy {
   public:
-	SinglePartitionMessageRouter(unsigned int numPartitions);
+		explicit SinglePartitionMessageRouter(int partitionIndex);
         typedef boost::hash<std::string> StringHash;
         virtual ~SinglePartitionMessageRouter();
-        virtual int getPartition(const Message& msg);
+        virtual int getPartition(const Message& msg, const TopicMetadata& topicMetadata);
     private:
-	unsigned int numPartitions_;
 	int selectedSinglePartition_;
     };
 
diff --git a/pulsar-client-cpp/lib/TopicMetadataImpl.cc b/pulsar-client-cpp/lib/TopicMetadataImpl.cc
new file mode 100644
index 000000000..c48e4cf86
--- /dev/null
+++ b/pulsar-client-cpp/lib/TopicMetadataImpl.cc
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TopicMetadataImpl.h"
+
+namespace pulsar {
+    TopicMetadataImpl::TopicMetadataImpl(const int numPartitions) : numPartitions_(numPartitions) {
+
+    }
+
+    int TopicMetadataImpl::getNumPartitions() const {
+        return numPartitions_;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/TopicMetadataImpl.h b/pulsar-client-cpp/lib/TopicMetadataImpl.h
new file mode 100644
index 000000000..2734cf3a8
--- /dev/null
+++ b/pulsar-client-cpp/lib/TopicMetadataImpl.h
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef TOPIC_METADATA_IMPL_HPP_
+#define TOPIC_METADATA_IMPL_HPP_
+
+#include <pulsar/TopicMetadata.h>
+
+namespace pulsar {
+class TopicMetadataImpl : public TopicMetadata {
+public:
+    TopicMetadataImpl(const int numPartitions);
+    virtual int getNumPartitions() const;
+
+private:
+    int numPartitions_;
+};
+}
+
+#endif /* TOPIC_METADATA_IMPL_HPP_ */
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/CMakeLists.txt b/pulsar-client-cpp/tests/CMakeLists.txt
index 264794529..3f27ffed1 100644
--- a/pulsar-client-cpp/tests/CMakeLists.txt
+++ b/pulsar-client-cpp/tests/CMakeLists.txt
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-find_library(GTEST_LIBRARY_PATH gtest)
+find_library(GMOCK_LIBRARY_PATH gmock)
 
 file(GLOB TEST_SOURCES *.cc)
 
@@ -25,4 +25,4 @@ add_executable(main ${TEST_SOURCES})
 
 target_include_directories(main PRIVATE ${CMAKE_SOURCE_DIR}/lib)
 
-target_link_libraries(main ${CLIENT_LIBS} ${GTEST_LIBRARY_PATH} ztsClient)
+target_link_libraries(main ${CLIENT_LIBS} ${GMOCK_LIBRARY_PATH} ztsClient)
diff --git a/pulsar-client-cpp/tests/CustomRoutingPolicy.h b/pulsar-client-cpp/tests/CustomRoutingPolicy.h
index 9e8608877..1a8c3bce6 100644
--- a/pulsar-client-cpp/tests/CustomRoutingPolicy.h
+++ b/pulsar-client-cpp/tests/CustomRoutingPolicy.h
@@ -21,9 +21,16 @@
 
 #include <cstdlib> // rand()
 #include <boost/algorithm/string.hpp>
+#include <pulsar/DeprecatedException.h>
+
 namespace pulsar {
 class CustomRoutingPolicy : public MessageRoutingPolicy {
+    /** @deprecated */
     int getPartition(const Message& msg) {
+        throw DeprecatedException("Use getPartition(const Message&, const TopicMetadata&) instead.");
+    }
+
+    int getPartition(const Message& msg, const TopicMetadata& topicMetadata) {
         return 0;
     }
 };
diff --git a/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
new file mode 100644
index 000000000..2cab6d491
--- /dev/null
+++ b/pulsar-client-cpp/tests/RoundRobinMessageRouterTest.cc
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+
+#include "tests/mocks/GMockMessage.h"
+
+#include "../lib/RoundRobinMessageRouter.h"
+#include "../lib/TopicMetadataImpl.h"
+
+using ::testing::AtLeast;
+using ::testing::Return;
+using ::testing::ReturnRef;
+
+using namespace pulsar;
+
+// TODO: Edit Message class to suit Google Mock and enable these tests when 2.0.0 release.
+
+TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) {
+    const int numPartitions1 = 5;
+    const int numPartitions2 = 3;
+
+    RoundRobinMessageRouter router1;
+    RoundRobinMessageRouter router2;
+
+    GMockMessage message;
+    EXPECT_CALL(message, hasPartitionKey()).Times(20).WillRepeatedly(Return(false));
+    EXPECT_CALL(message, getPartitionKey()).Times(0);
+    for (int i = 0; i < 10; i++) {
+        ASSERT_EQ(i % numPartitions1, router1.getPartition(message, TopicMetadataImpl(numPartitions1)));
+        ASSERT_EQ(i % numPartitions2, router2.getPartition(message, TopicMetadataImpl(numPartitions2)));
+    }
+}
+
+TEST(RoundRobinMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
+    const int numPartitons = 1234;
+
+    RoundRobinMessageRouter router;
+
+    std::string partitionKey1 = "key1";
+    std::string partitionKey2 = "key2";
+
+    GMockMessage message1;
+    EXPECT_CALL(message1, hasPartitionKey()).Times(1).WillOnce(Return(true));
+    EXPECT_CALL(message1, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey1));
+
+    GMockMessage message2;
+    EXPECT_CALL(message2, hasPartitionKey()).Times(1).WillOnce(Return(true));
+    EXPECT_CALL(message2, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey2));
+
+    auto expectedParrtition1 = static_cast<const int>(boost::hash<std::string>()(partitionKey1) % numPartitons);
+    auto expectedParrtition2 = static_cast<const int>(boost::hash<std::string>()(partitionKey2) % numPartitons);
+
+    ASSERT_EQ(expectedParrtition1, router.getPartition(message1, TopicMetadataImpl(numPartitons)));
+    ASSERT_EQ(expectedParrtition2, router.getPartition(message2, TopicMetadataImpl(numPartitons)));
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc
new file mode 100644
index 000000000..bb3fe1c38
--- /dev/null
+++ b/pulsar-client-cpp/tests/SinglePartitionMessageRouterTest.cc
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
+
+#include "tests/mocks/GMockMessage.h"
+
+#include "../lib/SinglePartitionMessageRouter.h"
+#include "../lib/TopicMetadataImpl.h"
+
+using ::testing::AtLeast;
+using ::testing::Return;
+using ::testing::ReturnRef;
+
+using namespace pulsar;
+
+// TODO: Edit Message class to suit Google Mock and enable these tests when 2.0.0 release.
+
+TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithoutPartitionKey) {
+    const int selectedPartition = 1234;
+
+    SinglePartitionMessageRouter router(selectedPartition);
+
+    GMockMessage message;
+    EXPECT_CALL(message, hasPartitionKey()).Times(1).WillOnce(Return(false));
+    EXPECT_CALL(message, getPartitionKey()).Times(0);
+
+    ASSERT_EQ(selectedPartition, router.getPartition(message, TopicMetadataImpl(1)));
+}
+
+TEST(SinglePartitionMessageRouterTest, DISABLED_getPartitionWithPartitionKey) {
+    const int numPartitons = 1234;
+
+    SinglePartitionMessageRouter router(1);
+
+    std::string partitionKey1 = "key1";
+    std::string partitionKey2 = "key2";
+
+    GMockMessage message1;
+    EXPECT_CALL(message1, hasPartitionKey()).Times(1).WillOnce(Return(true));
+    EXPECT_CALL(message1, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey1));
+
+    GMockMessage message2;
+    EXPECT_CALL(message2, hasPartitionKey()).Times(1).WillOnce(Return(true));
+    EXPECT_CALL(message2, getPartitionKey()).Times(1).WillOnce(ReturnRef(partitionKey2));
+
+    auto expectedParrtition1 = static_cast<const int>(boost::hash<std::string>()(partitionKey1) % numPartitons);
+    auto expectedParrtition2 = static_cast<const int>(boost::hash<std::string>()(partitionKey2) % numPartitons);
+
+    ASSERT_EQ(expectedParrtition1, router.getPartition(message1, TopicMetadataImpl(numPartitons)));
+    ASSERT_EQ(expectedParrtition2, router.getPartition(message2, TopicMetadataImpl(numPartitons)));
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/TopicMetadataImplTest.cc b/pulsar-client-cpp/tests/TopicMetadataImplTest.cc
new file mode 100644
index 000000000..091dd2659
--- /dev/null
+++ b/pulsar-client-cpp/tests/TopicMetadataImplTest.cc
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <pulsar/Client.h>
+#include <gtest/gtest.h>
+
+#include "../lib/TopicMetadataImpl.h"
+
+using namespace pulsar;
+
+TEST(TopicMetadataImplTest, numPartitions) {
+    TopicMetadataImpl topicMetadata(1234);
+    ASSERT_EQ(1234, topicMetadata.getNumPartitions());
+}
diff --git a/pulsar-client-cpp/tests/main.cc b/pulsar-client-cpp/tests/main.cc
index 3aa455410..a600af357 100644
--- a/pulsar-client-cpp/tests/main.cc
+++ b/pulsar-client-cpp/tests/main.cc
@@ -17,10 +17,10 @@
  * under the License.
  */
 #include <LogUtils.h>
-#include <gtest/gtest.h>
+#include <gmock/gmock.h>
 
 int main(int argc, char **argv) {
     LogUtils::init("log4cxx.conf");
-    ::testing::InitGoogleTest(&argc, argv);
+    ::testing::InitGoogleMock(&argc, argv);
     return RUN_ALL_TESTS();
 }
diff --git a/pulsar-client-cpp/tests/mocks/GMockMessage.h b/pulsar-client-cpp/tests/mocks/GMockMessage.h
new file mode 100644
index 000000000..29e3eb04e
--- /dev/null
+++ b/pulsar-client-cpp/tests/mocks/GMockMessage.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef MOCK_MESSAGE_HPP_
+#define MOCK_MESSAGE_HPP_
+
+#include <gmock/gmock.h>
+#include <pulsar/Message.h>
+
+namespace pulsar {
+    // TODO: For the mock tests, we need to make all methods and destructor virtual in Message class
+    class GMockMessage : public Message {
+    public:
+        MOCK_CONST_METHOD0(hasPartitionKey, bool());
+
+        MOCK_CONST_METHOD0(getPartitionKey, const std::string&());
+    };
+}
+
+#endif // MOCK_MESSAGE_HPP_


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services