You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/11 07:19:19 UTC
[rocketmq-clients] branch cpp_dev updated: Prepare to debug
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/cpp_dev by this push:
new 31a4f4a Prepare to debug
31a4f4a is described below
commit 31a4f4a1d2717ceae9f205279d70c8cc932c78cc
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jul 11 15:19:04 2022 +0800
Prepare to debug
---
cpp/src/main/cpp/client/ClientManagerImpl.cpp | 7 +--
cpp/src/main/cpp/client/LogInterceptor.cpp | 4 +-
cpp/src/main/cpp/rocketmq/ClientImpl.cpp | 6 +-
cpp/src/main/cpp/stats/MetricBidiReactor.cpp | 10 +++-
cpp/src/main/cpp/stats/OpencensusExporter.cpp | 2 +
cpp/src/main/cpp/stats/StdoutHandler.cpp | 81 ++++++++++++++++++++++++++
cpp/src/main/cpp/stats/include/StdoutHandler.h | 76 ++++++++++++++++++++++++
7 files changed, 175 insertions(+), 11 deletions(-)
diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 1f9eec6..78b13f7 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -331,7 +331,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
}
if (State::STARTED != client_manager_ptr->state()) {
- // TODO: Would this leak some memroy?
+ // TODO: Would this leak some memory?
return;
}
@@ -501,10 +501,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
} else if (!search->second->ok()) {
SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
}
- std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
- interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
- auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
- target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
+ auto channel = createChannel(target_host);
std::weak_ptr<ClientManager> client_manager(shared_from_this());
client = std::make_shared<RpcClientImpl>(client_manager, channel, target_host, need_heartbeat);
rpc_clients_.insert_or_assign(target_host, client);
diff --git a/cpp/src/main/cpp/client/LogInterceptor.cpp b/cpp/src/main/cpp/client/LogInterceptor.cpp
index 09dbc9f..7702864 100644
--- a/cpp/src/main/cpp/client/LogInterceptor.cpp
+++ b/cpp/src/main/cpp/client/LogInterceptor.cpp
@@ -15,13 +15,15 @@
* limitations under the License.
*/
#include "LogInterceptor.h"
+
+#include <cstddef>
+
#include "InterceptorContinuation.h"
#include "absl/container/flat_hash_map.h"
#include "absl/strings/str_join.h"
#include "google/protobuf/message.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
-#include <cstddef>
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index ec3de9e..25fe498 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -39,6 +39,7 @@
#include "NamingScheme.h"
#include "SessionImpl.h"
#include "Signature.h"
+#include "StdoutHandler.h"
#include "UtilAll.h"
#include "absl/strings/numbers.h"
#include "absl/strings/str_join.h"
@@ -205,11 +206,12 @@ void ClientImpl::start() {
#ifdef DEBUG_METRIC_EXPORTING
opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+ opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
#else
opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
-#endif
-
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target, client_weak_ptr));
+#endif
+ SPDLOG_INFO("Export client metrics to {}", target);
}
void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
index 62983ee..655c942 100644
--- a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
+++ b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -49,7 +49,7 @@ void MetricBidiReactor::OnReadDone(bool ok) {
SPDLOG_WARN("Failed to read response");
return;
}
-
+ SPDLOG_DEBUG("OnReadDone OK");
StartRead(&response_);
}
@@ -58,7 +58,8 @@ void MetricBidiReactor::OnWriteDone(bool ok) {
SPDLOG_WARN("Failed to report metrics");
return;
}
-
+ SPDLOG_DEBUG("OnWriteDone OK");
+ fireRead();
bool expected = true;
if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
fireWrite();
@@ -75,6 +76,10 @@ void MetricBidiReactor::OnDone(const grpc::Status& s) {
SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
} else {
SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+ auto exporter = exporter_.lock();
+ if (exporter) {
+ exporter->resetStream();
+ }
}
}
@@ -102,7 +107,6 @@ void MetricBidiReactor::fireWrite() {
request_.CopyFrom(requests_[0]);
requests_.erase(requests_.begin());
StartWrite(&request_);
- fireRead();
}
}
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index cb76298..651965d 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -30,6 +30,8 @@ OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Clie
if (client_shared_ptr) {
auto channel = client_shared_ptr->manager()->createChannel(endpoints);
stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+ } else {
+ SPDLOG_ERROR("Failed to initialize OpencensusExporter. weak_ptr to Client is nullptr");
}
}
diff --git a/cpp/src/main/cpp/stats/StdoutHandler.cpp b/cpp/src/main/cpp/stats/StdoutHandler.cpp
new file mode 100644
index 0000000..9d18d78
--- /dev/null
+++ b/cpp/src/main/cpp/stats/StdoutHandler.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StdoutHandler.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void StdoutHandler::ExportViewData(
+ const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
+ for (const auto& datum : data) {
+ const auto& view_data = datum.second;
+ const auto& descriptor = datum.first;
+ auto start_times = view_data.start_times();
+ auto columns = descriptor.columns();
+
+ switch (view_data.type()) {
+ case opencensus::stats::ViewData::Type::kInt64: {
+ auto data_map = view_data.int_data();
+ for (const auto& entry : data_map) {
+ absl::Time time = start_times[entry.first];
+ std::string line;
+ line.append(absl::FormatTime(time)).append(" ");
+ line.append(descriptor.name());
+ line.append("{");
+ for (std::size_t i = 0; i < columns.size(); i++) {
+ line.append(columns[i].name()).append("=").append(entry.first[i]);
+ if (i < columns.size() - 1) {
+ line.append(", ");
+ } else {
+ line.append("} ==> ");
+ }
+ }
+ line.append(std::to_string(entry.second));
+ println(line);
+ }
+ break;
+ }
+ case opencensus::stats::ViewData::Type::kDouble: {
+ exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.double_data());
+ break;
+ }
+ case opencensus::stats::ViewData::Type::kDistribution: {
+ for (const auto& entry : view_data.distribution_data()) {
+ std::string line(descriptor.name());
+ line.append("{");
+ for (std::size_t i = 0; i < columns.size(); i++) {
+ line.append(columns[i].name()).append("=").append(entry.first[i]);
+ if (i < columns.size() - 1) {
+ line.append(", ");
+ } else {
+ line.append("} ==> ");
+ }
+ }
+ line.append(entry.second.DebugString());
+ println(line);
+
+ println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ","));
+ }
+ break;
+ }
+ }
+ }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/StdoutHandler.h b/cpp/src/main/cpp/stats/include/StdoutHandler.h
new file mode 100644
index 0000000..1bfc3dc
--- /dev/null
+++ b/cpp/src/main/cpp/stats/include/StdoutHandler.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <string>
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class StdoutHandler : public opencensus::stats::StatsExporter::Handler {
+public:
+ void ExportViewData(
+ const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override;
+
+private:
+ template <typename T>
+ void exportDatum(const opencensus::stats::ViewDescriptor& descriptor,
+ absl::Time start_time,
+ absl::Time end_time,
+ const opencensus::stats::ViewData::DataMap<T>& data) {
+ if (data.empty()) {
+ // std::cout << "No data for " << descriptor.name() << std::endl;
+ return;
+ }
+
+ for (const auto& row : data) {
+ for (std::size_t column = 0; column < descriptor.columns().size(); column++) {
+ std::cout << descriptor.name() << "[" << descriptor.columns()[column].name() << "=" << row.first[column] << "]"
+ << dataToString(row.second) << std::endl;
+ }
+ }
+ }
+
+ std::mutex console_mtx_;
+
+ void println(const std::string& line) {
+ std::lock_guard<std::mutex> lk(console_mtx_);
+ std::cout << line << std::endl;
+ }
+
+ // Functions to format data for different aggregation types.
+ std::string dataToString(double data) {
+ return absl::StrCat(": ", data, "\n");
+ }
+ std::string dataToString(int64_t data) {
+ return absl::StrCat(": ", data, "\n");
+ }
+ std::string dataToString(const opencensus::stats::Distribution& data) {
+ std::string output = "\n";
+ std::vector<std::string> lines = absl::StrSplit(data.DebugString(), '\n');
+ // Add indent.
+ for (const auto& line : lines) {
+ absl::StrAppend(&output, " ", line, "\n");
+ }
+ return output;
+ }
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file