You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/15 07:59:58 UTC

[rocketmq-clients] 01/01: Use ClientManager per Client strategy

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 7257f3dbf566270b299a9b5439f026c1eb10ceea
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Jul 15 15:59:31 2022 +0800

    Use ClientManager per Client strategy
---
 cpp/examples/ExampleProducer.cpp                 |  4 +-
 cpp/source/client/ClientManagerFactory.cpp       | 62 ------------------------
 cpp/source/client/include/ClientManagerFactory.h | 57 ----------------------
 cpp/source/rocketmq/ClientImpl.cpp               |  8 +--
 cpp/source/rocketmq/PushConsumerImpl.cpp         |  1 -
 cpp/source/rocketmq/include/ClientImpl.h         |  9 ++++
 6 files changed, 15 insertions(+), 126 deletions(-)

diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index fa86af7..9bdb9b4 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -47,8 +47,8 @@ std::string randomString(std::string::size_type len) {
 }
 
 int main(int argc, char* argv[]) {
-  const char* topic = "cpp_sdk_standard";
-  const char* name_server = "11.166.42.94:8081";
+  const char* topic = "lingchu_normal_topic";
+  const char* name_server = "121.196.167.124:8081";
 
   auto producer =
       Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
diff --git a/cpp/source/client/ClientManagerFactory.cpp b/cpp/source/client/ClientManagerFactory.cpp
deleted file mode 100644
index 3cacaae..0000000
--- a/cpp/source/client/ClientManagerFactory.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ClientManagerFactory.h"
-#include "ClientManagerImpl.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ClientManagerFactory& ClientManagerFactory::getInstance() {
-  static ClientManagerFactory instance;
-  return instance;
-}
-
-ClientManagerPtr ClientManagerFactory::getClientManager(const ClientConfig& client_config) {
-  {
-    absl::MutexLock lock(&client_manager_table_mtx_);
-    auto search = client_manager_table_.find(client_config.resource_namespace);
-    if (search != client_manager_table_.end()) {
-      ClientManagerPtr client_manager = search->second.lock();
-      if (client_manager) {
-        SPDLOG_DEBUG("Re-use existing client_manager[resource_namespace={}]", client_config.resource_namespace);
-        return client_manager;
-      } else {
-        client_manager_table_.erase(client_config.resource_namespace);
-      }
-    }
-    ClientManagerPtr client_manager = std::make_shared<ClientManagerImpl>(client_config.resource_namespace);
-    std::weak_ptr<ClientManager> client_instance_weak_ptr(client_manager);
-    client_manager_table_.insert_or_assign(client_config.resource_namespace, client_instance_weak_ptr);
-    SPDLOG_INFO("Created a new client manager[resource_namespace={}]", client_config.resource_namespace);
-    return client_manager;
-  }
-}
-
-void ClientManagerFactory::addClientManager(const std::string& resource_namespace,
-                                            const ClientManagerPtr& client_manager) {
-  absl::MutexLock lk(&client_manager_table_mtx_);
-  client_manager_table_.insert_or_assign(resource_namespace, std::weak_ptr<ClientManager>(client_manager));
-}
-
-ClientManagerFactory::ClientManagerFactory() : admin_server_(admin::AdminFacade::getServer()) {
-  admin_server_.start();
-}
-
-ClientManagerFactory::~ClientManagerFactory() {
-  admin_server_.stop();
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/ClientManagerFactory.h b/cpp/source/client/include/ClientManagerFactory.h
deleted file mode 100644
index ec6488d..0000000
--- a/cpp/source/client/include/ClientManagerFactory.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-#include <thread>
-
-#include "absl/base/thread_annotations.h"
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-
-#include "ClientConfig.h"
-#include "ClientManager.h"
-#include "rocketmq/AdminServer.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ClientManagerFactory {
-public:
-  static ClientManagerFactory& getInstance();
-
-  ClientManagerPtr getClientManager(const ClientConfig& client_config) LOCKS_EXCLUDED(client_manager_table_mtx_);
-
-  // For test purpose only
-  void addClientManager(const std::string& resource_namespace, const ClientManagerPtr& client_manager)
-      LOCKS_EXCLUDED(client_manager_table_mtx_);
-
-private:
-  ClientManagerFactory();
-
-  virtual ~ClientManagerFactory();
-
-  /**
-   * Client Id --> Client Instance
-   */
-  absl::flat_hash_map<std::string, std::weak_ptr<ClientManager>>
-      client_manager_table_ GUARDED_BY(client_manager_table_mtx_);
-  absl::Mutex client_manager_table_mtx_; // protects client_manager_table_
-
-  rocketmq::admin::AdminServer& admin_server_;
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp
index 190b9c2..c578eb7 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -16,8 +16,6 @@
  */
 #include "ClientImpl.h"
 
-#include <apache/rocketmq/v2/definition.pb.h>
-
 #include <algorithm>
 #include <atomic>
 #include <chrono>
@@ -31,7 +29,7 @@
 #include <system_error>
 #include <utility>
 
-#include "ClientManagerFactory.h"
+#include "ClientManagerImpl.h"
 #include "InvocationContext.h"
 #include "LoggerImpl.h"
 #include "MessageExt.h"
@@ -111,7 +109,9 @@ void ClientImpl::start() {
 
   client_config_.client_id = clientId();
 
-  client_manager_ = ClientManagerFactory::getInstance().getClientManager(client_config_);
+  if (!client_manager_) {
+    client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace);
+  }
   client_manager_->start();
 
   const auto& endpoint = name_server_resolver_->resolve();
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp
index a1bfe90..55e9728 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -23,7 +23,6 @@
 #include <system_error>
 
 #include "AsyncReceiveMessageCallback.h"
-#include "ClientManagerFactory.h"
 #include "ConsumeMessageServiceImpl.h"
 #include "MixAll.h"
 #include "ProcessQueueImpl.h"
diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h
index c095c56..07f1efe 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -44,6 +44,15 @@ class ClientImpl : virtual public Client {
 public:
   explicit ClientImpl(absl::string_view group_name);
 
+  /**
+   * @brief Allow assigning client manager for test purpose only.
+   *
+   * @param client_manager
+   */
+  void clientManager(std::shared_ptr<ClientManager> client_manager) {
+    client_manager_ = std::move(client_manager);
+  }
+
   virtual void start();
 
   virtual void shutdown();