You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/11 07:19:19 UTC

[rocketmq-clients] branch cpp_dev updated: Prepare to debug

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/cpp_dev by this push:
     new 31a4f4a  Prepare to debug
31a4f4a is described below

commit 31a4f4a1d2717ceae9f205279d70c8cc932c78cc
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Mon Jul 11 15:19:04 2022 +0800

    Prepare to debug
---
 cpp/src/main/cpp/client/ClientManagerImpl.cpp  |  7 +--
 cpp/src/main/cpp/client/LogInterceptor.cpp     |  4 +-
 cpp/src/main/cpp/rocketmq/ClientImpl.cpp       |  6 +-
 cpp/src/main/cpp/stats/MetricBidiReactor.cpp   | 10 +++-
 cpp/src/main/cpp/stats/OpencensusExporter.cpp  |  2 +
 cpp/src/main/cpp/stats/StdoutHandler.cpp       | 81 ++++++++++++++++++++++++++
 cpp/src/main/cpp/stats/include/StdoutHandler.h | 76 ++++++++++++++++++++++++
 7 files changed, 175 insertions(+), 11 deletions(-)

diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
index 1f9eec6..78b13f7 100644
--- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp
@@ -331,7 +331,7 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met
     }
 
     if (State::STARTED != client_manager_ptr->state()) {
-      // TODO: Would this leak some memroy?
+      // TODO: Would this leak some memory?
       return;
     }
 
@@ -501,10 +501,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos
       } else if (!search->second->ok()) {
         SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host);
       }
-      std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories;
-      interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>());
-      auto channel = grpc::experimental::CreateCustomChannelWithInterceptors(
-          target_host, channel_credential_, channel_arguments_, std::move(interceptor_factories));
+      auto channel = createChannel(target_host);
       std::weak_ptr<ClientManager> client_manager(shared_from_this());
       client = std::make_shared<RpcClientImpl>(client_manager, channel, target_host, need_heartbeat);
       rpc_clients_.insert_or_assign(target_host, client);
diff --git a/cpp/src/main/cpp/client/LogInterceptor.cpp b/cpp/src/main/cpp/client/LogInterceptor.cpp
index 09dbc9f..7702864 100644
--- a/cpp/src/main/cpp/client/LogInterceptor.cpp
+++ b/cpp/src/main/cpp/client/LogInterceptor.cpp
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 #include "LogInterceptor.h"
+
+#include <cstddef>
+
 #include "InterceptorContinuation.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/strings/str_join.h"
 #include "google/protobuf/message.h"
 #include "rocketmq/Logger.h"
 #include "spdlog/spdlog.h"
-#include <cstddef>
 
 ROCKETMQ_NAMESPACE_BEGIN
 
diff --git a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
index ec3de9e..25fe498 100644
--- a/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/cpp/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -39,6 +39,7 @@
 #include "NamingScheme.h"
 #include "SessionImpl.h"
 #include "Signature.h"
+#include "StdoutHandler.h"
 #include "UtilAll.h"
 #include "absl/strings/numbers.h"
 #include "absl/strings/str_join.h"
@@ -205,11 +206,12 @@ void ClientImpl::start() {
 
 #ifdef DEBUG_METRIC_EXPORTING
   opencensus::stats::StatsExporter::SetInterval(absl::Seconds(1));
+  opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
 #else
   opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
-#endif
-
   opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<OpencensusHandler>(target, client_weak_ptr));
+#endif
+  SPDLOG_INFO("Export client metrics to {}", target);
 }
 
 void ClientImpl::shutdown() {
diff --git a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
index 62983ee..655c942 100644
--- a/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
+++ b/cpp/src/main/cpp/stats/MetricBidiReactor.cpp
@@ -49,7 +49,7 @@ void MetricBidiReactor::OnReadDone(bool ok) {
     SPDLOG_WARN("Failed to read response");
     return;
   }
-
+  SPDLOG_DEBUG("OnReadDone OK");
   StartRead(&response_);
 }
 
@@ -58,7 +58,8 @@ void MetricBidiReactor::OnWriteDone(bool ok) {
     SPDLOG_WARN("Failed to report metrics");
     return;
   }
-
+  SPDLOG_DEBUG("OnWriteDone OK");
+  fireRead();
   bool expected = true;
   if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
     fireWrite();
@@ -75,6 +76,10 @@ void MetricBidiReactor::OnDone(const grpc::Status& s) {
     SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
   } else {
     SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
+    auto exporter = exporter_.lock();
+    if (exporter) {
+      exporter->resetStream();
+    }
   }
 }
 
@@ -102,7 +107,6 @@ void MetricBidiReactor::fireWrite() {
     request_.CopyFrom(requests_[0]);
     requests_.erase(requests_.begin());
     StartWrite(&request_);
-    fireRead();
   }
 }
 
diff --git a/cpp/src/main/cpp/stats/OpencensusExporter.cpp b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
index cb76298..651965d 100644
--- a/cpp/src/main/cpp/stats/OpencensusExporter.cpp
+++ b/cpp/src/main/cpp/stats/OpencensusExporter.cpp
@@ -30,6 +30,8 @@ OpencensusExporter::OpencensusExporter(std::string endpoints, std::weak_ptr<Clie
   if (client_shared_ptr) {
     auto channel = client_shared_ptr->manager()->createChannel(endpoints);
     stub_ = opencensus::proto::agent::metrics::v1::MetricsService::NewStub(channel);
+  } else {
+    SPDLOG_ERROR("Failed to initialize OpencensusExporter. weak_ptr to Client is nullptr");
   }
 }
 
diff --git a/cpp/src/main/cpp/stats/StdoutHandler.cpp b/cpp/src/main/cpp/stats/StdoutHandler.cpp
new file mode 100644
index 0000000..9d18d78
--- /dev/null
+++ b/cpp/src/main/cpp/stats/StdoutHandler.cpp
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StdoutHandler.h"
+
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void StdoutHandler::ExportViewData(
+    const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) {
+  for (const auto& datum : data) {
+    const auto& view_data = datum.second;
+    const auto& descriptor = datum.first;
+    auto start_times = view_data.start_times();
+    auto columns = descriptor.columns();
+
+    switch (view_data.type()) {
+      case opencensus::stats::ViewData::Type::kInt64: {
+        auto data_map = view_data.int_data();
+        for (const auto& entry : data_map) {
+          absl::Time time = start_times[entry.first];
+          std::string line;
+          line.append(absl::FormatTime(time)).append(" ");
+          line.append(descriptor.name());
+          line.append("{");
+          for (std::size_t i = 0; i < columns.size(); i++) {
+            line.append(columns[i].name()).append("=").append(entry.first[i]);
+            if (i < columns.size() - 1) {
+              line.append(", ");
+            } else {
+              line.append("} ==> ");
+            }
+          }
+          line.append(std::to_string(entry.second));
+          println(line);
+        }
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDouble: {
+        exportDatum(datum.first, view_data.start_time(), view_data.end_time(), view_data.double_data());
+        break;
+      }
+      case opencensus::stats::ViewData::Type::kDistribution: {
+        for (const auto& entry : view_data.distribution_data()) {
+          std::string line(descriptor.name());
+          line.append("{");
+          for (std::size_t i = 0; i < columns.size(); i++) {
+            line.append(columns[i].name()).append("=").append(entry.first[i]);
+            if (i < columns.size() - 1) {
+              line.append(", ");
+            } else {
+              line.append("} ==> ");
+            }
+          }
+          line.append(entry.second.DebugString());
+          println(line);
+
+          println(absl::StrJoin(entry.second.bucket_boundaries().lower_boundaries(), ","));
+        }
+        break;
+      }
+    }
+  }
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/src/main/cpp/stats/include/StdoutHandler.h b/cpp/src/main/cpp/stats/include/StdoutHandler.h
new file mode 100644
index 0000000..1bfc3dc
--- /dev/null
+++ b/cpp/src/main/cpp/stats/include/StdoutHandler.h
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <mutex>
+#include <string>
+
+#include "opencensus/stats/stats.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+class StdoutHandler : public opencensus::stats::StatsExporter::Handler {
+public:
+  void ExportViewData(
+      const std::vector<std::pair<opencensus::stats::ViewDescriptor, opencensus::stats::ViewData>>& data) override;
+
+private:
+  template <typename T>
+  void exportDatum(const opencensus::stats::ViewDescriptor& descriptor,
+                   absl::Time start_time,
+                   absl::Time end_time,
+                   const opencensus::stats::ViewData::DataMap<T>& data) {
+    if (data.empty()) {
+      // std::cout << "No data for " << descriptor.name() << std::endl;
+      return;
+    }
+
+    for (const auto& row : data) {
+      for (std::size_t column = 0; column < descriptor.columns().size(); column++) {
+        std::cout << descriptor.name() << "[" << descriptor.columns()[column].name() << "=" << row.first[column] << "]"
+                  << dataToString(row.second) << std::endl;
+      }
+    }
+  }
+
+  std::mutex console_mtx_;
+
+  void println(const std::string& line) {
+    std::lock_guard<std::mutex> lk(console_mtx_);
+    std::cout << line << std::endl;
+  }
+
+  // Functions to format data for different aggregation types.
+  std::string dataToString(double data) {
+    return absl::StrCat(": ", data, "\n");
+  }
+  std::string dataToString(int64_t data) {
+    return absl::StrCat(": ", data, "\n");
+  }
+  std::string dataToString(const opencensus::stats::Distribution& data) {
+    std::string output = "\n";
+    std::vector<std::string> lines = absl::StrSplit(data.DebugString(), '\n');
+    // Add indent.
+    for (const auto& line : lines) {
+      absl::StrAppend(&output, "    ", line, "\n");
+    }
+    return output;
+  }
+};
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file