You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/11 03:13:04 UTC

[rocketmq-clients] 01/02: Support client metrics exporting

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit aad2ba8c53ab9a51bb9fa668bf073d12136b2101
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jul 11 11:08:31 2022 +0800

    Support client metrics exporting
---
 cpp/src/main/cpp/client/include/ClientManager.h    |  6 ++++
 cpp/src/main/cpp/rocketmq/ClientImpl.cpp           | 40 ++++++++++++++++++++++
 cpp/src/main/cpp/rocketmq/include/ClientImpl.h     |  1 +
 cpp/src/main/cpp/stats/OpencensusExporter.cpp      |  8 ++++-
 cpp/src/main/cpp/stats/include/Exporter.h          | 30 ----------------
 .../main/cpp/stats/include/OpencensusExporter.h    |  7 ++--
 6 files changed, 58 insertions(+), 34 deletions(-)

diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h
index 99ec6ad..2ec5b2e 100644
--- a/cpp/src/main/cpp/client/include/ClientManager.h
+++ b/cpp/src/main/cpp/client/include/ClientManager.h
@@ -50,6 +50,12 @@ public:
 
   virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& target_host, bool need_heartbeat) = 0;
 
+  /**
+   * @brief Create a Channel object
+   *
+   * @param target_host gRPC naming targets, following https://github.com/grpc/grpc/blob/master/doc/naming.md
+   * @return std::shared_ptr<grpc::Channel>
+   */
   virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0;
 
   virtual void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index 8275922..a5e7b62 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -16,6 +16,8 @@
  */
 #include "ClientImpl.h"
 
+#include <apache/rocketmq/v2/definition.pb.h>
+
 #include <algorithm>
 #include <atomic>
 #include <chrono>
@@ -34,6 +36,7 @@
 #include "InvocationContext.h"
 #include "LoggerImpl.h"
 #include "MessageExt.h"
+#include "MetricBidiReactor.h"
 #include "NamingScheme.h"
 #include "SessionImpl.h"
 #include "Signature.h"
@@ -41,6 +44,7 @@
 #include "absl/strings/numbers.h"
 #include "absl/strings/str_join.h"
 #include "absl/strings/str_split.h"
+#include "opencensus/stats/stats.h"
 #include "rocketmq/Message.h"
 #include "rocketmq/MessageListener.h"
 
@@ -165,6 +169,42 @@ void ClientImpl::start() {
 
   route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
                                                                    std::chrono::seconds(10), std::chrono::seconds(30));
+
+  auto endpoints = client_config_.metric.endpoints;
+  std::string target;
+  switch (endpoints.scheme()) {
+    case rmq::AddressScheme::IPv4: {
+      target.append("ipv4:");
+      break;
+    }
+    case rmq::AddressScheme::IPv6: {
+      target.append("ipv6:");
+      break;
+    }
+    case rmq::AddressScheme::DOMAIN_NAME: {
+      target.append("dns:");
+      break;
+    }
+    default: {
+      SPDLOG_ERROR("Unknown address scheme");
+    }
+  }
+
+  bool first = true;
+  for (const auto& address : endpoints.addresses()) {
+    if (!first) {
+      target.push_back(',');
+    } else {
+      first = false;
+    }
+    target.append(address.host());
+    target.push_back(':');
+    target.append(std::to_string(address.port()));
+  }
+
+  std::weak_ptr<Client> client_weak_ptr(self());
+  opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+  opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusExporter>(target, client_weak_ptr));
 }
 
 void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index c136cc9..e820351 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -30,6 +30,7 @@
 #include "InvocationContext.h"
 #include "MessageExt.h"
 #include "NameServerResolver.h"
+#include "OpencensusExporter.h"
 #include "RpcClient.h"
 #include "Session.h"
 #include "TelemetryBidiReactor.h"
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index 0bf13a4..cb76298 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -17,6 +17,7 @@
 
 #include "OpencensusExporter.h"
 
+#include "ClientManager.h"
 #include "MetricBidiReactor.h"
 #include "google/protobuf/util/time_util.h"
 
@@ -25,6 +26,11 @@ ROCKETMQ_NAMESPACE_BEGIN
 namespace opencensus_proto = opencensus::proto::metrics::v1;
 
 OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) {
+  auto client_shared_ptr = client.lock();
+  if (client_shared_ptr) {
+    auto channel = client_shared_ptr->manager()->createChannel(endpoints);
+    stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+  }
 }
 
 void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) {
@@ -171,7 +177,7 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques
   }
 }
 
-void OpencensusExporter::exportMetrics(
+void OpencensusExporter::ExportViewData(
     const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
   opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
   wrap(data, request);
diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/include/Exporter.h
deleted file mode 100644
index 7f2f24a..0000000
--- a/cpp/src/main/cpp/stats/include/Exporter.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class Exporter {
-public:
-  virtual void exportMetrics(
-      const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0;
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
index 7920ff5..161843c 100644
--- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
@@ -17,9 +17,9 @@
 #pragma once
 
 #include "Client.h"
-#include "Exporter.h"
 #include "grpcpp/grpcpp.h"
 #include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+#include "opencensus/stats/stats.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>;
 using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>;
 using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
 
-class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> {
+class OpencensusExporter : public opencensus::stats::StatsExporter::Handler,
+                           public std::enable_shared_from_this<OpencensusExporter> {
 public:
   OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
 
-  void exportMetrics(const MetricData& data) override;
+  void ExportViewData(const MetricData& data) override;
 
   static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);