You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/15 07:59:58 UTC
[rocketmq-clients] 01/01: Use ClientManager per Client strategy
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 7257f3dbf566270b299a9b5439f026c1eb10ceea
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Fri Jul 15 15:59:31 2022 +0800
Use ClientManager per Client strategy
---
cpp/examples/ExampleProducer.cpp | 4 +-
cpp/source/client/ClientManagerFactory.cpp | 62 ------------------------
cpp/source/client/include/ClientManagerFactory.h | 57 ----------------------
cpp/source/rocketmq/ClientImpl.cpp | 8 +--
cpp/source/rocketmq/PushConsumerImpl.cpp | 1 -
cpp/source/rocketmq/include/ClientImpl.h | 9 ++++
6 files changed, 15 insertions(+), 126 deletions(-)
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index fa86af7..9bdb9b4 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -47,8 +47,8 @@ std::string randomString(std::string::size_type len) {
}
int main(int argc, char* argv[]) {
- const char* topic = "cpp_sdk_standard";
- const char* name_server = "11.166.42.94:8081";
+ const char* topic = "lingchu_normal_topic";
+ const char* name_server = "121.196.167.124:8081";
auto producer =
Producer::newBuilder().withConfiguration(Configuration::newBuilder().withEndpoints(name_server).build()).build();
diff --git a/cpp/source/client/ClientManagerFactory.cpp b/cpp/source/client/ClientManagerFactory.cpp
deleted file mode 100644
index 3cacaae..0000000
--- a/cpp/source/client/ClientManagerFactory.cpp
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ClientManagerFactory.h"
-#include "ClientManagerImpl.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-ClientManagerFactory& ClientManagerFactory::getInstance() {
- static ClientManagerFactory instance;
- return instance;
-}
-
-ClientManagerPtr ClientManagerFactory::getClientManager(const ClientConfig& client_config) {
- {
- absl::MutexLock lock(&client_manager_table_mtx_);
- auto search = client_manager_table_.find(client_config.resource_namespace);
- if (search != client_manager_table_.end()) {
- ClientManagerPtr client_manager = search->second.lock();
- if (client_manager) {
- SPDLOG_DEBUG("Re-use existing client_manager[resource_namespace={}]", client_config.resource_namespace);
- return client_manager;
- } else {
- client_manager_table_.erase(client_config.resource_namespace);
- }
- }
- ClientManagerPtr client_manager = std::make_shared<ClientManagerImpl>(client_config.resource_namespace);
- std::weak_ptr<ClientManager> client_instance_weak_ptr(client_manager);
- client_manager_table_.insert_or_assign(client_config.resource_namespace, client_instance_weak_ptr);
- SPDLOG_INFO("Created a new client manager[resource_namespace={}]", client_config.resource_namespace);
- return client_manager;
- }
-}
-
-void ClientManagerFactory::addClientManager(const std::string& resource_namespace,
- const ClientManagerPtr& client_manager) {
- absl::MutexLock lk(&client_manager_table_mtx_);
- client_manager_table_.insert_or_assign(resource_namespace, std::weak_ptr<ClientManager>(client_manager));
-}
-
-ClientManagerFactory::ClientManagerFactory() : admin_server_(admin::AdminFacade::getServer()) {
- admin_server_.start();
-}
-
-ClientManagerFactory::~ClientManagerFactory() {
- admin_server_.stop();
-}
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/ClientManagerFactory.h b/cpp/source/client/include/ClientManagerFactory.h
deleted file mode 100644
index ec6488d..0000000
--- a/cpp/source/client/include/ClientManagerFactory.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include <string>
-#include <thread>
-
-#include "absl/base/thread_annotations.h"
-#include "absl/container/flat_hash_map.h"
-#include "absl/synchronization/mutex.h"
-
-#include "ClientConfig.h"
-#include "ClientManager.h"
-#include "rocketmq/AdminServer.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class ClientManagerFactory {
-public:
- static ClientManagerFactory& getInstance();
-
- ClientManagerPtr getClientManager(const ClientConfig& client_config) LOCKS_EXCLUDED(client_manager_table_mtx_);
-
- // For test purpose only
- void addClientManager(const std::string& resource_namespace, const ClientManagerPtr& client_manager)
- LOCKS_EXCLUDED(client_manager_table_mtx_);
-
-private:
- ClientManagerFactory();
-
- virtual ~ClientManagerFactory();
-
- /**
- * Client Id --> Client Instance
- */
- absl::flat_hash_map<std::string, std::weak_ptr<ClientManager>>
- client_manager_table_ GUARDED_BY(client_manager_table_mtx_);
- absl::Mutex client_manager_table_mtx_; // protects client_manager_table_
-
- rocketmq::admin::AdminServer& admin_server_;
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp
index 190b9c2..c578eb7 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -16,8 +16,6 @@
*/
#include "ClientImpl.h"
-#include <apache/rocketmq/v2/definition.pb.h>
-
#include <algorithm>
#include <atomic>
#include <chrono>
@@ -31,7 +29,7 @@
#include <system_error>
#include <utility>
-#include "ClientManagerFactory.h"
+#include "ClientManagerImpl.h"
#include "InvocationContext.h"
#include "LoggerImpl.h"
#include "MessageExt.h"
@@ -111,7 +109,9 @@ void ClientImpl::start() {
client_config_.client_id = clientId();
- client_manager_ = ClientManagerFactory::getInstance().getClientManager(client_config_);
+ if (!client_manager_) {
+ client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace);
+ }
client_manager_->start();
const auto& endpoint = name_server_resolver_->resolve();
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp
index a1bfe90..55e9728 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -23,7 +23,6 @@
#include <system_error>
#include "AsyncReceiveMessageCallback.h"
-#include "ClientManagerFactory.h"
#include "ConsumeMessageServiceImpl.h"
#include "MixAll.h"
#include "ProcessQueueImpl.h"
diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h
index c095c56..07f1efe 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -44,6 +44,15 @@ class ClientImpl : virtual public Client {
public:
explicit ClientImpl(absl::string_view group_name);
+ /**
+ * @brief Allow assigning client manager for test purpose only.
+ *
+ * @param client_manager
+ */
+ void clientManager(std::shared_ptr<ClientManager> client_manager) {
+ client_manager_ = std::move(client_manager);
+ }
+
virtual void start();
virtual void shutdown();