You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 11:43:21 UTC

[rocketmq-client-cpp] branch main updated: Fill 'messaging.rocketmq.message_keys' attribute in message tracing with value of array (#383)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b066034  Fill 'messaging.rocketmq.message_keys' attribute in message tracing with value of array (#383)
b066034 is described below

commit b066034388747552091a8f63a951391f5fd34d3e
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 19:43:14 2021 +0800

    Fill 'messaging.rocketmq.message_keys' attribute in message tracing with value of array (#383)
---
 src/main/cpp/base/MixAll.cpp                       |  5 +++--
 src/main/cpp/base/include/MixAll.h                 |  1 +
 .../cpp/rocketmq/ConsumeFifoMessageService.cpp     |  6 ------
 .../cpp/rocketmq/ConsumeStandardMessageService.cpp |  6 ------
 src/main/cpp/rocketmq/ProducerImpl.cpp             |  5 -----
 .../rocketmq/include/ConsumeFifoMessageService.h   |  4 ++--
 src/main/cpp/tracing/TracingUtility.cpp            |  7 +++++++
 src/main/cpp/tracing/exporters/OtlpExporter.cpp    | 23 ++++++++++++++++++++++
 8 files changed, 36 insertions(+), 21 deletions(-)

diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index ba69ce4..4691e69 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -84,8 +84,8 @@ const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
 // RocketMQ span attribute name list
 const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION = "messaging.rocketmq.operation";
 const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE = "messaging.rocketmq.namespace";
-const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.tag";
-const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.keys";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.message_tag";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.message_keys";
 const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID = "messaging.rocketmq.client_id";
 const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE = "messaging.rocketmq.message_type";
 const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP = "messaging.rocketmq.client_group";
@@ -135,6 +135,7 @@ const char* MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction";
 
 // Span annotation
 const char* MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption";
+const char* MixAll::SPAN_ANNOTATION_MESSAGE_KEYS = "__message_keys";
 const char* MixAll::SPAN_ANNOTATION_ATTR_START_TIME = "__start_time";
 
 bool MixAll::validate(const MQMessage& message) {
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index d25f4b8..30ff613 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -127,6 +127,7 @@ public:
 
   // Tracing annotation
   static const char* SPAN_ANNOTATION_AWAIT_CONSUMPTION;
+  static const char* SPAN_ANNOTATION_MESSAGE_KEYS;
   static const char* SPAN_ANNOTATION_ATTR_START_TIME;
 
   template <typename Rep, typename Period>
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
index 58ee2ec..76db7a5 100644
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
@@ -145,9 +145,6 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
                       MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
     TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
-    const auto& keys = message.getKeys();
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
-                      absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
     absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(message);
     span.AddAnnotation(
@@ -173,9 +170,6 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
                     MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
   TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
-  const auto& keys = message.getKeys();
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
-                    absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, message.getDeliveryAttempt());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
   MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(message),
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index 956a4cf..e31b2ed 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -153,9 +153,6 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
                         MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
       TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
-      const auto& keys = msg.getKeys();
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
-                        absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, msg.getStoreTimestamp());
       absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
       span.AddAnnotation(
@@ -186,9 +183,6 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
                         MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
       TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
-      const auto& keys = msg.getKeys();
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
-                        absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, msg.getStoreTimestamp());
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index d22f6dc..df3fb75 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -329,11 +329,6 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
                       MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
     TracingUtility::addUniversalSpanAttributes(message, *this, span);
-    const auto& keys = callback->message().getKeys();
-    if (!keys.empty()) {
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
-                        absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
-    }
     // Note: attempt-time is 0-based
     span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + callback->attemptTime());
 
diff --git a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
index a3712d8..ea85888 100644
--- a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
@@ -28,8 +28,8 @@ public:
 
   /**
    * @brief Entry of ConsumeMessageService
-   * 
-   * @param process_queue 
+   *
+   * @param process_queue
    */
   void submitConsumeTask(const ProcessQueueWeakPtr& process_queue) override;
 
diff --git a/src/main/cpp/tracing/TracingUtility.cpp b/src/main/cpp/tracing/TracingUtility.cpp
index 6446e3f..d5b7be0 100644
--- a/src/main/cpp/tracing/TracingUtility.cpp
+++ b/src/main/cpp/tracing/TracingUtility.cpp
@@ -16,6 +16,7 @@
  */
 #include "TracingUtility.h"
 #include "MixAll.h"
+#include "absl/strings/str_join.h"
 #include "rocketmq/CredentialsProvider.h"
 #include "spdlog/spdlog.h"
 
@@ -26,6 +27,12 @@ void TracingUtility::addUniversalSpanAttributes(const MQMessage& message, Client
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES, message.getBody().length());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.getTags());
+  const std::vector<std::string>& keys = message.getKeys();
+  if (!keys.empty()) {
+    span.AddAnnotation(MixAll::SPAN_ANNOTATION_MESSAGE_KEYS,
+                       {{MixAll::SPAN_ANNOTATION_MESSAGE_KEYS,
+                         absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR)}});
+  }
   switch (message.messageType()) {
     case MessageType::FIFO:
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index e1c1735..1d24de4 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -20,6 +20,7 @@
 #include "MixAll.h"
 #include "Signature.h"
 #include "UtilAll.h"
+#include "absl/strings/str_split.h"
 #include "fmt/format.h"
 #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"
 #include "opentelemetry/proto/common/v1/common.pb.h"
@@ -263,6 +264,28 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
           }
           continue;
         }
+        if (annotation.event().description() == MixAll::SPAN_ANNOTATION_MESSAGE_KEYS) {
+          for (const auto& attr : annotation.event().attributes()) {
+            if (attr.first == MixAll::SPAN_ANNOTATION_MESSAGE_KEYS) {
+              assert(attr.second.type() == opencensus::trace::AttributeValueRef::Type::kString);
+              std::string message_keys = attr.second.string_value();
+              std::vector<std::string> key_list = absl::StrSplit(message_keys, MixAll::MESSAGE_KEY_SEPARATOR);
+              auto key_kv = new common::KeyValue();
+              key_kv->set_key(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS);
+              auto key_value = new common::AnyValue();
+              auto key_array_value = new common::ArrayValue();
+              for (const auto& key : key_list) {
+                auto value = new common::AnyValue();
+                value->set_string_value(key);
+                key_array_value->mutable_values()->AddAllocated(value);
+              }
+              key_value->set_allocated_array_value(key_array_value);
+              key_kv->set_allocated_value(key_value);
+              item->mutable_attributes()->AddAllocated(key_kv);
+            }
+          }
+          continue;
+        }
         auto ev = new trace::Span::Event();
         ev->set_time_unix_nano(absl::ToUnixNanos(annotation.timestamp()));
         auto attrs = annotation.event().attributes();