You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/22 10:17:14 UTC

[rocketmq-client-cpp] branch main updated: Implement trace for standard consumption (#378)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 6548bab  Implement trace for standard consumption (#378)
6548bab is described below

commit 6548babfd7dbcacce86416bfca2850391a7bfd76
Author: aaron ai <ya...@gmail.com>
AuthorDate: Fri Oct 22 18:17:10 2021 +0800

    Implement trace for standard consumption (#378)
---
 src/main/cpp/base/MixAll.cpp                       |  7 +---
 src/main/cpp/base/include/MixAll.h                 |  7 +---
 .../cpp/rocketmq/ConsumeStandardMessageService.cpp | 44 +++++++++++-----------
 src/main/cpp/rocketmq/ProducerImpl.cpp             | 12 +++---
 src/main/cpp/tracing/exporters/OtlpExporter.cpp    |  8 ++--
 5 files changed, 37 insertions(+), 41 deletions(-)

diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index 93d2d38..ba69ce4 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -66,6 +66,8 @@ const uint32_t MixAll::DEFAULT_COMPRESS_BODY_THRESHOLD_ = 1024 * 1024 * 4;
 const char* MixAll::HOME_PROFILE_ENV_ = "HOME";
 const char* MixAll::MESSAGE_KEY_SEPARATOR = " ";
 
+const char* MixAll::OTLP_NAME_VALUE = "org.apache.rocketmq.message";
+
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_TELEMETRY_SDK_LANGUAGE = "telemetry.sdk.language";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_TELEMETRY_SDK_LANGUAGE = "cpp";
 
@@ -73,11 +75,7 @@ const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_HOST_NAME = "host.name";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
 
-// Span name list
-const char* MixAll::SPAN_NAME_SEND_MESSAGE = "SendMessage";
 const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-const char* MixAll::SPAN_NAME_AWAIT_CONSUMPTION = "AwaitingConsumption";
-const char* MixAll::SPAN_NAME_CONSUME_MESSAGE = "ConsumeMessage";
 const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
 
 // Span attributes follows to the opentelemetry specification, refers to:
@@ -133,7 +131,6 @@ const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION = "send";
 const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION = "receive";
 const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION = "process";
 
-const char* MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME = "host.name";
 const char* MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction";
 
 // Span annotation
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 8a47ecf..d25f4b8 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -61,6 +61,8 @@ public:
 
   static const char* MESSAGE_KEY_SEPARATOR;
 
+  static const char* OTLP_NAME_VALUE;
+
   static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_TELEMETRY_SDK_LANGUAGE;
   static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_TELEMETRY_SDK_LANGUAGE;
 
@@ -68,11 +70,7 @@ public:
   static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME;
   static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
 
-  // Tracing span name list
-  static const char* SPAN_NAME_SEND_MESSAGE;
   static const char* SPAN_NAME_END_TRANSACTION;
-  static const char* SPAN_NAME_AWAIT_CONSUMPTION;
-  static const char* SPAN_NAME_CONSUME_MESSAGE;
   static const char* SPAN_NAME_PULL_MESSAGE;
 
   // RocketMQ span attribute name list
@@ -125,7 +123,6 @@ public:
   static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION;
   static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION;
 
-  static const char* SPAN_ATTRIBUTE_KEY_HOST_NAME;
   static const char* SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION;
 
   // Tracing annotation
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index b13b34a..46ef575 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -19,6 +19,7 @@
 #include <system_error>
 #include <utility>
 
+#include "TracingUtility.h"
 #include "absl/memory/memory.h"
 #include "absl/strings/str_join.h"
 #include "absl/time/time.h"
@@ -36,6 +37,7 @@
 #include "UtilAll.h"
 #include "rocketmq/ConsumeType.h"
 #include "rocketmq/MQMessage.h"
+#include "rocketmq/MQMessageExt.h"
 #include "rocketmq/MessageListener.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -139,33 +141,30 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
       auto span_context = opencensus::trace::propagation::FromTraceParentHeader(msg.traceContext());
 
       auto span = opencensus::trace::Span::BlankSpan();
+      std::string span_name = consumer->resourceNamespace() + "/" + msg.getTopic() + " " +
+                              MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION;
       if (span_context.IsValid()) {
-        span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_AWAIT_CONSUMPTION, span_context,
-                                                                  &Samplers::always());
+        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, &Samplers::always());
       } else {
-        span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_AWAIT_CONSUMPTION, nullptr, {&Samplers::always()});
+        span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
       }
-
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
-                        consumer->credentialsProvider()->getCredentials().accessKey());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+      TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
       const auto& keys = msg.getKeys();
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
                         absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
                         absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
       absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
       span.AddAnnotation(
           MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
           {{MixAll::SPAN_ANNOTATION_ATTR_START_TIME,
             opencensus::trace::AttributeValueRef(absl::ToInt64Milliseconds(decoded_timestamp - absl::UnixEpoch()))}});
       span.End();
+      MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(msg),
+                                       opencensus::trace::propagation::ToTraceParentHeader(span.context()));
     }
   }
 
@@ -175,27 +174,28 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
     for (const auto& msg : msgs) {
       auto span_context = opencensus::trace::propagation::FromTraceParentHeader(msg.traceContext());
       auto span = opencensus::trace::Span::BlankSpan();
+      std::string span_name = consumer->resourceNamespace() + "/" + msg.getTopic() + " " +
+                              MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION;
       if (span_context.IsValid()) {
-        span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_CONSUME_MESSAGE, span_context);
+        span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context);
       } else {
-        span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_CONSUME_MESSAGE);
+        span = opencensus::trace::Span::StartSpan(span_name);
       }
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
-                        consumer->credentialsProvider()->getCredentials().accessKey());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION);
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
+      TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
       const auto& keys = msg.getKeys();
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
                         absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
                         absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
       spans.emplace_back(std::move(span));
+      MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(msg),
+                                       opencensus::trace::propagation::ToTraceParentHeader(span.context()));
     }
   }
 
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 870bf48..4780220 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -310,11 +310,6 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
     return;
   }
 
-  SendMessageRequest request;
-  wrapSendMessageRequest(callback->message(), request, callback->messageQueue());
-  Metadata metadata;
-  Signature::sign(this, metadata);
-
   {
     // Trace Send RPC
     auto& message = callback->message();
@@ -348,6 +343,12 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
     callback->message().traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
     callback->span() = span;
   }
+
+  SendMessageRequest request;
+  wrapSendMessageRequest(callback->message(), request, callback->messageQueue());
+  Metadata metadata;
+  Signature::sign(this, metadata);
+
   client_manager_->send(target, metadata, request, callback);
 }
 
@@ -417,7 +418,6 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message_id);
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
   switch (resolution) {
     case TransactionState::COMMIT:
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index 1ebbc22..e1c1735 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include "OtlpExporter.h"
+#include "ClientConfigImpl.h"
 #include "InvocationContext.h"
 #include "MixAll.h"
 #include "Signature.h"
@@ -223,7 +224,7 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
   for (const auto& span : spans) {
     auto item = new trace::Span();
 
-    span.context().span_id().CopyTo(trace_id_buf);
+    span.context().trace_id().CopyTo(trace_id_buf);
     item->set_trace_id(&trace_id_buf, TRACE_ID_SIZE);
 
     span.context().span_id().CopyTo(span_id_buf);
@@ -345,8 +346,9 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
       item->mutable_status()->set_message(span.status().error_message());
     }
 
-    // item->mutable_status()->set_code()
-
+    instrument_library_span->mutable_instrumentation_library()->mutable_name()->assign(MixAll::OTLP_NAME_VALUE);
+    instrument_library_span->mutable_instrumentation_library()->mutable_version()->assign(
+        ClientConfigImpl::CLIENT_VERSION);
     instrument_library_span->mutable_spans()->AddAllocated(item);
   }
   resource->mutable_instrumentation_library_spans()->AddAllocated(instrument_library_span);