You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/11 03:13:04 UTC
[rocketmq-clients] 01/02: Support client metrics exporting
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit aad2ba8c53ab9a51bb9fa668bf073d12136b2101
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jul 11 11:08:31 2022 +0800
Support client metrics exporting
---
cpp/src/main/cpp/client/include/ClientManager.h | 6 ++++
cpp/src/main/cpp/rocketmq/ClientImpl.cpp | 40 ++++++++++++++++++++++
cpp/src/main/cpp/rocketmq/include/ClientImpl.h | 1 +
cpp/src/main/cpp/stats/OpencensusExporter.cpp | 8 ++++-
cpp/src/main/cpp/stats/include/Exporter.h | 30 ----------------
.../main/cpp/stats/include/OpencensusExporter.h | 7 ++--
6 files changed, 58 insertions(+), 34 deletions(-)
diff --git a/cpp/src/main/cpp/client/include/ClientManager.h b/cpp/src/main/cpp/client/include/ClientManager.h
index 99ec6ad..2ec5b2e 100644
--- a/cpp/src/main/cpp/client/include/ClientManager.h
+++ b/cpp/src/main/cpp/client/include/ClientManager.h
@@ -50,6 +50,12 @@ public:
virtual std::shared_ptr<RpcClient> getRpcClient(const std::string& target_host, bool need_heartbeat) = 0;
+ /**
+ * @brief Create a Channel object
+ *
+ * @param target_host gRPC naming targets, following https://github.com/grpc/grpc/blob/master/doc/naming.md
+ * @return std::shared_ptr<grpc::Channel>
+ */
virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0;
virtual void resolveRoute(const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index 8275922..a5e7b62 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -16,6 +16,8 @@
*/
#include "ClientImpl.h"
+#include <apache/rocketmq/v2/definition.pb.h>
+
#include <algorithm>
#include <atomic>
#include <chrono>
@@ -34,6 +36,7 @@
#include "InvocationContext.h"
#include "LoggerImpl.h"
#include "MessageExt.h"
+#include "MetricBidiReactor.h"
#include "NamingScheme.h"
#include "SessionImpl.h"
#include "Signature.h"
@@ -41,6 +44,7 @@
#include "absl/strings/numbers.h"
#include "absl/strings/str_join.h"
#include "absl/strings/str_split.h"
+#include "opencensus/stats/stats.h"
#include "rocketmq/Message.h"
#include "rocketmq/MessageListener.h"
@@ -165,6 +169,42 @@ void ClientImpl::start() {
route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
std::chrono::seconds(10), std::chrono::seconds(30));
+
+ auto endpoints = client_config_.metric.endpoints;
+ std::string target;
+ switch (endpoints.scheme()) {
+ case rmq::AddressScheme::IPv4: {
+ target.append("ipv4:");
+ break;
+ }
+ case rmq::AddressScheme::IPv6: {
+ target.append("ipv6:");
+ break;
+ }
+ case rmq::AddressScheme::DOMAIN_NAME: {
+ target.append("dns:");
+ break;
+ }
+ default: {
+ SPDLOG_ERROR("Unknown address scheme");
+ }
+ }
+
+ bool first = true;
+ for (const auto& address : endpoints.addresses()) {
+ if (!first) {
+ target.push_back(',');
+ } else {
+ first = false;
+ }
+ target.append(address.host());
+ target.push_back(':');
+ target.append(std::to_string(address.port()));
+ }
+
+ std::weak_ptr<Client> client_weak_ptr(self());
+ opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
+ opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusExporter>(target, client_weak_ptr));
}
void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
index c136cc9..e820351 100644
--- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -30,6 +30,7 @@
#include "InvocationContext.h"
#include "MessageExt.h"
#include "NameServerResolver.h"
+#include "OpencensusExporter.h"
#include "RpcClient.h"
#include "Session.h"
#include "TelemetryBidiReactor.h"
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index 0bf13a4..cb76298 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -17,6 +17,7 @@
#include "OpencensusExporter.h"
+#include "ClientManager.h"
#include "MetricBidiReactor.h"
#include "google/protobuf/util/time_util.h"
@@ -25,6 +26,11 @@ ROCKETMQ_NAMESPACE_BEGIN
namespace opencensus_proto = opencensus::proto::metrics::v1;
OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client) : client_(client) {
+ auto client_shared_ptr = client.lock();
+ if (client_shared_ptr) {
+ auto channel = client_shared_ptr->manager()->createChannel(endpoints);
+ stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+ }
}
void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceRequest& request) {
@@ -171,7 +177,7 @@ void OpencensusExporter::wrap(const MetricData& data, ExportMetricsServiceReques
}
}
-void OpencensusExporter::exportMetrics(
+void OpencensusExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest request;
wrap(data, request);
diff --git a/cpp/src/main/cpp/stats/include/Exporter.h b/cpp/src/main/cpp/stats/include/Exporter.h
deleted file mode 100644
index 7f2f24a..0000000
--- a/cpp/src/main/cpp/stats/include/Exporter.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-
-#include "opencensus/stats/stats.h"
-#include "rocketmq/RocketMQ.h"
-
-ROCKETMQ_NAMESPACE_BEGIN
-
-class Exporter {
-public:
- virtual void exportMetrics(
- const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) = 0;
-};
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/OpencensusExporter.h b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
index 7920ff5..161843c 100644
--- a/cpp/src/main/cpp/stats/include/OpencensusExporter.h
+++ b/cpp/src/main/cpp/stats/include/OpencensusExporter.h
@@ -17,9 +17,9 @@
#pragma once
#include "Client.h"
-#include "Exporter.h"
#include "grpcpp/grpcpp.h"
#include "opencensus/proto/agent/metrics/v1/metrics_service.grpc.pb.h"
+#include "opencensus/stats/stats.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -30,11 +30,12 @@ using StubPtr = std::unique_ptr<Stub>;
using MetricData = std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>;
using ExportMetricsServiceRequest = opencensus::proto::agent::metrics::v1::ExportMetricsServiceRequest;
-class OpencensusExporter : public Exporter, public std::enable_shared_from_this<OpencensusExporter> {
+class OpencensusExporter : public opencensus::stats::StatsExporter::Handler,
+ public std::enable_shared_from_this<OpencensusExporter> {
public:
OpencensusExporter(std::string endpoints, std::weak_ptr<Client> client);
- void exportMetrics(const MetricData& data) override;
+ void ExportViewData(const MetricData& data) override;
static void wrap(const MetricData& data, ExportMetricsServiceRequest& request);