You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/21 02:51:48 UTC

[rocketmq-client-cpp] branch main updated: Export trace-id and span-id using raw data (#374)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 26223ed  Export trace-id and span-id using raw data (#374)
26223ed is described below

commit 26223ed6d876671ef6af017031e43d8c7501b537
Author: aaron ai <ya...@gmail.com>
AuthorDate: Thu Oct 21 10:50:57 2021 +0800

    Export trace-id and span-id using raw data (#374)
---
 src/main/cpp/rocketmq/ClientImpl.cpp               | 23 ++++++++++++++++
 src/main/cpp/rocketmq/include/ClientImpl.h         |  5 ++++
 src/main/cpp/tracing/exporters/OtlpExporter.cpp    | 32 ++++++++++++++++++----
 .../cpp/tracing/exporters/include/OtlpExporter.h   |  6 ++++
 4 files changed, 60 insertions(+), 6 deletions(-)

diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 7996f03..049569a 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -297,6 +297,28 @@ void ClientImpl::onTopicRouteReady(const std::string& topic, const std::error_co
   }
 }
 
+void ClientImpl::updateTraceHosts() {
+  absl::flat_hash_set<std::string> hosts;
+  absl::MutexLock lk(&topic_route_table_mtx_);
+  for (const auto& item : topic_route_table_) {
+    for (const auto& partition : item.second->partitions()) {
+      if (Permission::NONE == partition.permission()) {
+        continue;
+      }
+      if (MixAll::MASTER_BROKER_ID != partition.broker().id()) {
+        continue;
+      }
+      std::string endpoint = partition.asMessageQueue().serviceAddress();
+      if (!hosts.contains(endpoint)) {
+        hosts.emplace(std::move(endpoint));
+      }
+    }
+  }
+  std::vector<std::string> host_list(hosts.begin(), hosts.end());
+  SPDLOG_DEBUG("Trace candidate hosts size={}", host_list.size());
+  exporter_->updateHosts(host_list);
+}
+
 void ClientImpl::updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) {
   if (ec || !route || route->partitions().empty()) {
     SPDLOG_WARN("Yuck! route for {} is invalid. Cause: {}", topic, ec.message());
@@ -339,6 +361,7 @@ void ClientImpl::updateRouteCache(const std::string& topic, const std::error_cod
     std::set_difference(hosts.begin(), hosts.end(), existed_hosts.begin(), existed_hosts.end(),
                         std::inserter(new_hosts, new_hosts.begin()));
   }
+  updateTraceHosts();
   for (const auto& endpoints : new_hosts) {
     pollCommand(endpoints);
   }
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index 80f1f8c..036179b 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -178,6 +178,11 @@ private:
       LOCKS_EXCLUDED(inflight_route_requests_mtx_);
 
   /**
+   * Update Trace candidate hosts.
+   */
+  void updateTraceHosts() LOCKS_EXCLUDED(topic_route_table_mtx_);
+
+  /**
    * Update local cache for the topic. Note, route differences are logged in
    * INFO level since route bears fundamental importance.
    *
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index 41796ba..836ceff 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -65,6 +65,9 @@ void ExportClient::asyncExport(const collector_trace::ExportTraceServiceRequest&
                                               invocation_context);
 }
 
+const int OtlpExporterHandler::SPAN_ID_SIZE = 8;
+const int OtlpExporterHandler::TRACE_ID_SIZE = 16;
+
 OtlpExporterHandler::OtlpExporterHandler(std::weak_ptr<OtlpExporter> exporter)
     : exporter_(std::move(exporter)), completion_queue_(std::make_shared<CompletionQueue>()) {
   auto exp = exporter_.lock();
@@ -198,11 +201,21 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
 
   auto resource = new trace::ResourceSpans();
   auto instrument_library_span = new trace::InstrumentationLibrarySpans();
+
+  uint8_t span_id_buf[SPAN_ID_SIZE];
+  uint8_t trace_id_buf[TRACE_ID_SIZE];
   for (const auto& span : spans) {
     auto item = new trace::Span();
-    item->set_trace_id(span.context().trace_id().ToHex());
-    item->set_span_id(span.context().span_id().ToHex());
-    item->set_parent_span_id(span.parent_span_id().ToHex());
+
+    span.context().span_id().CopyTo(trace_id_buf);
+    item->set_trace_id(&trace_id_buf, TRACE_ID_SIZE);
+
+    span.context().span_id().CopyTo(span_id_buf);
+    item->set_span_id(&span_id_buf, SPAN_ID_SIZE);
+
+    span.parent_span_id().CopyTo(span_id_buf);
+    item->set_parent_span_id(&span_id_buf, SPAN_ID_SIZE);
+
     item->set_name(span.name().data());
 
     item->set_start_time_unix_nano(absl::ToUnixNanos(span.start_time()));
@@ -260,8 +273,13 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
 
     for (const auto& link : span.links()) {
       auto span_link = new trace::Span::Link();
-      span_link->set_trace_id(link.trace_id().ToHex());
-      span_link->set_span_id(link.span_id().ToHex());
+
+      link.trace_id().CopyTo(trace_id_buf);
+      item->set_trace_id(&trace_id_buf, TRACE_ID_SIZE);
+
+      link.span_id().CopyTo(span_id_buf);
+      item->set_trace_id(&span_id_buf, SPAN_ID_SIZE);
+
       for (const auto& attribute : link.attributes()) {
         auto kv = new common::KeyValue();
         kv->set_key(attribute.first);
@@ -331,7 +349,9 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
     if (invocation_context->status.ok()) {
       SPDLOG_DEBUG("Export tracing spans OK");
     } else {
-      SPDLOG_WARN("Failed to export tracing spans to {}", invocation_context->remote_address);
+      SPDLOG_WARN("Failed to export tracing spans to {}, gRPC code:{}, gRPC error message: {}",
+                  invocation_context->remote_address, invocation_context->status.error_code(),
+                  invocation_context->status.error_message());
     }
   };
   invocation_context->callback = callback;
diff --git a/src/main/cpp/tracing/exporters/include/OtlpExporter.h b/src/main/cpp/tracing/exporters/include/OtlpExporter.h
index c00b8ec..3b6755b 100644
--- a/src/main/cpp/tracing/exporters/include/OtlpExporter.h
+++ b/src/main/cpp/tracing/exporters/include/OtlpExporter.h
@@ -59,6 +59,9 @@ public:
   }
 
   void updateHosts(std::vector<std::string> hosts) LOCKS_EXCLUDED(hosts_mtx_) {
+    if (hosts.empty()) {
+      return;
+    }
     absl::MutexLock lk(&hosts_mtx_);
     hosts_ = std::move(hosts);
   }
@@ -114,6 +117,9 @@ private:
 
 class OtlpExporterHandler : public ::opencensus::trace::exporter::SpanExporter::Handler {
 public:
+  static const int SPAN_ID_SIZE;
+  static const int TRACE_ID_SIZE;
+
   OtlpExporterHandler(std::weak_ptr<OtlpExporter> exporter);
 
   void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans) override;