You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/21 02:51:48 UTC
[rocketmq-client-cpp] branch main updated: Export trace-id and
span-id using raw data (#374)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 26223ed Export trace-id and span-id using raw data (#374)
26223ed is described below
commit 26223ed6d876671ef6af017031e43d8c7501b537
Author: aaron ai <ya...@gmail.com>
AuthorDate: Thu Oct 21 10:50:57 2021 +0800
Export trace-id and span-id using raw data (#374)
---
src/main/cpp/rocketmq/ClientImpl.cpp | 23 ++++++++++++++++
src/main/cpp/rocketmq/include/ClientImpl.h | 5 ++++
src/main/cpp/tracing/exporters/OtlpExporter.cpp | 32 ++++++++++++++++++----
.../cpp/tracing/exporters/include/OtlpExporter.h | 6 ++++
4 files changed, 60 insertions(+), 6 deletions(-)
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 7996f03..049569a 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -297,6 +297,28 @@ void ClientImpl::onTopicRouteReady(const std::string& topic, const std::error_co
}
}
+void ClientImpl::updateTraceHosts() {
+ absl::flat_hash_set<std::string> hosts;
+ absl::MutexLock lk(&topic_route_table_mtx_);
+ for (const auto& item : topic_route_table_) {
+ for (const auto& partition : item.second->partitions()) {
+ if (Permission::NONE == partition.permission()) {
+ continue;
+ }
+ if (MixAll::MASTER_BROKER_ID != partition.broker().id()) {
+ continue;
+ }
+ std::string endpoint = partition.asMessageQueue().serviceAddress();
+ if (!hosts.contains(endpoint)) {
+ hosts.emplace(std::move(endpoint));
+ }
+ }
+ }
+ std::vector<std::string> host_list(hosts.begin(), hosts.end());
+ SPDLOG_DEBUG("Trace candidate hosts size={}", host_list.size());
+ exporter_->updateHosts(host_list);
+}
+
void ClientImpl::updateRouteCache(const std::string& topic, const std::error_code& ec, const TopicRouteDataPtr& route) {
if (ec || !route || route->partitions().empty()) {
SPDLOG_WARN("Yuck! route for {} is invalid. Cause: {}", topic, ec.message());
@@ -339,6 +361,7 @@ void ClientImpl::updateRouteCache(const std::string& topic, const std::error_cod
std::set_difference(hosts.begin(), hosts.end(), existed_hosts.begin(), existed_hosts.end(),
std::inserter(new_hosts, new_hosts.begin()));
}
+ updateTraceHosts();
for (const auto& endpoints : new_hosts) {
pollCommand(endpoints);
}
diff --git a/src/main/cpp/rocketmq/include/ClientImpl.h b/src/main/cpp/rocketmq/include/ClientImpl.h
index 80f1f8c..036179b 100644
--- a/src/main/cpp/rocketmq/include/ClientImpl.h
+++ b/src/main/cpp/rocketmq/include/ClientImpl.h
@@ -178,6 +178,11 @@ private:
LOCKS_EXCLUDED(inflight_route_requests_mtx_);
/**
+ * Update Trace candidate hosts.
+ */
+ void updateTraceHosts() LOCKS_EXCLUDED(topic_route_table_mtx_);
+
+ /**
* Update local cache for the topic. Note, route differences are logged in
* INFO level since route bears fundamental importance.
*
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index 41796ba..836ceff 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -65,6 +65,9 @@ void ExportClient::asyncExport(const collector_trace::ExportTraceServiceRequest&
invocation_context);
}
+const int OtlpExporterHandler::SPAN_ID_SIZE = 8;
+const int OtlpExporterHandler::TRACE_ID_SIZE = 16;
+
OtlpExporterHandler::OtlpExporterHandler(std::weak_ptr<OtlpExporter> exporter)
: exporter_(std::move(exporter)), completion_queue_(std::make_shared<CompletionQueue>()) {
auto exp = exporter_.lock();
@@ -198,11 +201,21 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
auto resource = new trace::ResourceSpans();
auto instrument_library_span = new trace::InstrumentationLibrarySpans();
+
+ uint8_t span_id_buf[SPAN_ID_SIZE];
+ uint8_t trace_id_buf[TRACE_ID_SIZE];
for (const auto& span : spans) {
auto item = new trace::Span();
- item->set_trace_id(span.context().trace_id().ToHex());
- item->set_span_id(span.context().span_id().ToHex());
- item->set_parent_span_id(span.parent_span_id().ToHex());
+
+ span.context().span_id().CopyTo(trace_id_buf);
+ item->set_trace_id(&trace_id_buf, TRACE_ID_SIZE);
+
+ span.context().span_id().CopyTo(span_id_buf);
+ item->set_span_id(&span_id_buf, SPAN_ID_SIZE);
+
+ span.parent_span_id().CopyTo(span_id_buf);
+ item->set_parent_span_id(&span_id_buf, SPAN_ID_SIZE);
+
item->set_name(span.name().data());
item->set_start_time_unix_nano(absl::ToUnixNanos(span.start_time()));
@@ -260,8 +273,13 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
for (const auto& link : span.links()) {
auto span_link = new trace::Span::Link();
- span_link->set_trace_id(link.trace_id().ToHex());
- span_link->set_span_id(link.span_id().ToHex());
+
+ link.trace_id().CopyTo(trace_id_buf);
+ item->set_trace_id(&trace_id_buf, TRACE_ID_SIZE);
+
+ link.span_id().CopyTo(span_id_buf);
+ item->set_trace_id(&span_id_buf, SPAN_ID_SIZE);
+
for (const auto& attribute : link.attributes()) {
auto kv = new common::KeyValue();
kv->set_key(attribute.first);
@@ -331,7 +349,9 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
if (invocation_context->status.ok()) {
SPDLOG_DEBUG("Export tracing spans OK");
} else {
- SPDLOG_WARN("Failed to export tracing spans to {}", invocation_context->remote_address);
+ SPDLOG_WARN("Failed to export tracing spans to {}, gRPC code:{}, gRPC error message: {}",
+ invocation_context->remote_address, invocation_context->status.error_code(),
+ invocation_context->status.error_message());
}
};
invocation_context->callback = callback;
diff --git a/src/main/cpp/tracing/exporters/include/OtlpExporter.h b/src/main/cpp/tracing/exporters/include/OtlpExporter.h
index c00b8ec..3b6755b 100644
--- a/src/main/cpp/tracing/exporters/include/OtlpExporter.h
+++ b/src/main/cpp/tracing/exporters/include/OtlpExporter.h
@@ -59,6 +59,9 @@ public:
}
void updateHosts(std::vector<std::string> hosts) LOCKS_EXCLUDED(hosts_mtx_) {
+ if (hosts.empty()) {
+ return;
+ }
absl::MutexLock lk(&hosts_mtx_);
hosts_ = std::move(hosts);
}
@@ -114,6 +117,9 @@ private:
class OtlpExporterHandler : public ::opencensus::trace::exporter::SpanExporter::Handler {
public:
+ static const int SPAN_ID_SIZE;
+ static const int TRACE_ID_SIZE;
+
OtlpExporterHandler(std::weak_ptr<OtlpExporter> exporter);
void Export(const std::vector<::opencensus::trace::exporter::SpanData>& spans) override;