You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/21 10:42:00 UTC

[rocketmq-client-cpp] branch main updated: Update rocketmq trace attributes key name (#376)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9555dae  Update rocketmq trace attributes key name (#376)
9555dae is described below

commit 9555dae37c08d9cf1139da27cb6c1aba9f3c6e8f
Author: aaron ai <ya...@gmail.com>
AuthorDate: Thu Oct 21 18:41:13 2021 +0800

    Update rocketmq trace attributes key name (#376)
---
 src/main/cpp/base/MixAll.cpp                       | 54 ++++++++++++-----
 src/main/cpp/base/include/MixAll.h                 | 51 +++++++++++-----
 src/main/cpp/client/ClientManagerImpl.cpp          |  4 +-
 src/main/cpp/rocketmq/BUILD.bazel                  |  1 +
 .../cpp/rocketmq/ConsumeStandardMessageService.cpp | 42 ++++++-------
 src/main/cpp/rocketmq/ProducerImpl.cpp             | 41 +++++++------
 src/main/cpp/scheduler/include/Scheduler.h         |  2 +-
 src/main/cpp/scheduler/include/SchedulerImpl.h     |  1 -
 src/main/cpp/{rocketmq => tracing}/BUILD.bazel     | 14 ++---
 src/main/cpp/tracing/TracingUtility.cpp            | 70 ++++++++++++++++++++++
 src/main/cpp/tracing/exporters/OtlpExporter.cpp    | 19 +++++-
 .../include/TracingUtility.h}                      | 27 ++-------
 src/test/cpp/ut/scheduler/SchedulerTest.cpp        |  7 ++-
 13 files changed, 224 insertions(+), 109 deletions(-)

diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index 9a86e33..7006410 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -73,21 +73,45 @@ const char* MixAll::SPAN_NAME_AWAIT_CONSUMPTION = "AwaitingConsumption";
 const char* MixAll::SPAN_NAME_CONSUME_MESSAGE = "ConsumeMessage";
 const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
 
-// Span attribute name list
-const char* MixAll::SPAN_ATTRIBUTE_ACCESS_KEY = "ak";
-const char* MixAll::SPAN_ATTRIBUTE_ARN = "setResourceNamespace";
-const char* MixAll::SPAN_ATTRIBUTE_KEYS = "keys";
-const char* MixAll::SPAN_ATTRIBUTE_MESSAGE_TYPE = "msgType";
-const char* MixAll::SPAN_ATTRIBUTE_DELIVERY_TIMESTAMP = "deliveryTimestamp";
-const char* MixAll::SPAN_ATTRIBUTE_TOPIC = "topic";
-const char* MixAll::SPAN_ATTRIBUTE_GROUP = "group";
-const char* MixAll::SPAN_ATTRIBUTE_MESSAGE_ID = "msgId";
-const char* MixAll::SPAN_ATTRIBUTE_TAG = "tags";
-const char* MixAll::SPAN_ATTRIBUTE_HOST = "host";
-const char* MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME = "attempt";
-const char* MixAll::SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION = "commitAction";
-const char* MixAll::SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP = "availableTimestamp";
-const char* MixAll::SPAN_ATTRIBUTE_BATCH_SIZE = "batchSize";
+// Span attributes follows to the opentelemetry specification, refers to:
+// https://github.com/open-telemetry/opentelemetry-specification
+
+// RocketMQ span attribute name list
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE = "messaging.rocketmq.namespace";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.tag";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.keys";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID = "messaging.rocketmq.client_id";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE = "messaging.rocketmq.message_type";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP = "messaging.rocketmq.client_group";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT = "messaging.rocketmq.attempt";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE = "messaging.rocketmq.batch_size";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP = "messaging.rocketmq.delivery_timestamp";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP = "messaging.rocketmq.available_timestamp";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY = "messaging.rocketmq.access_key";
+
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM = "rocketmq";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND = "topic";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL = "RMQ-gRPC";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION = "v1";
+
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE = "normal";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE = "fifo";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE = "delay";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE = "transaction";
+
+// Messaging span attribute name list
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM = "messaging.system";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION = "messaging.destination";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND = "messaging.destination_kind";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL = "messaging.protocol";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION = "messaging.protocol_version";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_URL = "messaging.url";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID = "messaging.message_id";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION = "messaging.operation";
+
+const char* MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME = "host.name";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction";
 
 // Span annotation
 const char* MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption";
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 1a84c66..9a82b30 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -68,21 +68,42 @@ public:
   static const char* SPAN_NAME_CONSUME_MESSAGE;
   static const char* SPAN_NAME_PULL_MESSAGE;
 
-  // Tracing attribute name list
-  static const char* SPAN_ATTRIBUTE_ACCESS_KEY;
-  static const char* SPAN_ATTRIBUTE_ARN;
-  static const char* SPAN_ATTRIBUTE_TOPIC;
-  static const char* SPAN_ATTRIBUTE_GROUP;
-  static const char* SPAN_ATTRIBUTE_MESSAGE_ID;
-  static const char* SPAN_ATTRIBUTE_TAG;
-  static const char* SPAN_ATTRIBUTE_KEYS;
-  static const char* SPAN_ATTRIBUTE_HOST;
-  static const char* SPAN_ATTRIBUTE_MESSAGE_TYPE;
-  static const char* SPAN_ATTRIBUTE_ATTEMPT_TIME;
-  static const char* SPAN_ATTRIBUTE_DELIVERY_TIMESTAMP;
-  static const char* SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION;
-  static const char* SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP;
-  static const char* SPAN_ATTRIBUTE_BATCH_SIZE;
+  // RocketMQ span attribute name list
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP;
+  static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY;
+
+  static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM;
+  static const char* SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND;
+  static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL;
+  static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION;
+
+  static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE;
+  static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE;
+  static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE;
+  static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE;
+
+  // Messaging attribute name list
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_URL;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_ID;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES;
+  static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION;
+
+  static const char* SPAN_ATTRIBUTE_KEY_HOST_NAME;
+  static const char* SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION;
 
   // Tracing annotation
   static const char* SPAN_ANNOTATION_AWAIT_CONSUMPTION;
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 227623d..db77dc8 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -113,7 +113,7 @@ void ClientManagerImpl::start() {
   state_.store(State::STARTING, std::memory_order_relaxed);
 
   callback_thread_pool_->start();
-  
+
   scheduler_->start();
 
   std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();
@@ -169,7 +169,7 @@ void ClientManagerImpl::shutdown() {
   if (stats_task_id_) {
     scheduler_->cancel(stats_task_id_);
   }
-  
+
   scheduler_->shutdown();
 
   {
diff --git a/src/main/cpp/rocketmq/BUILD.bazel b/src/main/cpp/rocketmq/BUILD.bazel
index 20cfe8c..6d6dd72 100644
--- a/src/main/cpp/rocketmq/BUILD.bazel
+++ b/src/main/cpp/rocketmq/BUILD.bazel
@@ -26,6 +26,7 @@ cc_library(
     deps = [
         "//src/main/cpp/client:client_library",
         "//src/main/cpp/tracing/exporters:otlp_exporter",
+        "//src/main/cpp/tracing:tracing_utility",
         "//src/main/cpp/log:log_library",
         "//src/main/cpp/admin:admin_server_library",
         "@com_google_absl//absl/types:optional",
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index 4376afd..b13b34a 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -146,20 +146,20 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
         span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_AWAIT_CONSUMPTION, nullptr, {&Samplers::always()});
       }
 
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
                         consumer->credentialsProvider()->getCredentials().accessKey());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, consumer->resourceNamespace());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TOPIC, msg.getTopic());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, msg.getMsgId());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, consumer->getGroupName());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TAG, msg.getTags());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
       const auto& keys = msg.getKeys();
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEYS,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
                         absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME, msg.getDeliveryAttempt());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
                         absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
       absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
       span.AddAnnotation(
           MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
@@ -180,21 +180,21 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
       } else {
         span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_CONSUME_MESSAGE);
       }
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
                         consumer->credentialsProvider()->getCredentials().accessKey());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, consumer->resourceNamespace());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TOPIC, msg.getTopic());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, msg.getMsgId());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, consumer->getGroupName());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TAG, msg.getTags());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
       const auto& keys = msg.getKeys();
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEYS,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
                         absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME, msg.getDeliveryAttempt());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
                         absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_BATCH_SIZE, msgs.size());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
       spans.emplace_back(std::move(span));
     }
   }
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 034c920..5b51fcd 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -37,6 +37,7 @@
 #include "SendCallbacks.h"
 #include "SendMessageContext.h"
 #include "Signature.h"
+#include "TracingUtility.h"
 #include "TransactionImpl.h"
 #include "UniqueIdGenerator.h"
 #include "UtilAll.h"
@@ -326,24 +327,26 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
     } else {
       span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_SEND_MESSAGE, nullptr, {&Samplers::always()});
     }
-
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY, credentialsProvider()->getCredentials().accessKey());
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, resourceNamespace());
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TOPIC, message.getTopic());
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, message.getMsgId());
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, getGroupName());
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TAG, message.getTags());
+    TracingUtility::addUniversalSpanAttributes(message, *this, span);
+
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
+                      credentialsProvider()->getCredentials().accessKey());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, message.getTopic());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.getTags());
     const auto& keys = callback->message().getKeys();
     if (!keys.empty()) {
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEYS,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
                         absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
     }
     // Note: attempt-time is 0-based
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME, 1 + callback->attemptTime());
-    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + callback->attemptTime());
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
 
     if (message.deliveryTimestamp() != absl::ToChronoTime(absl::UnixEpoch())) {
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_DELIVERY_TIMESTAMP,
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
                         absl::FormatTime(absl::FromChrono(message.deliveryTimestamp())));
     }
     callback->message().traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
@@ -413,18 +416,18 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
   } else {
     span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_END_TRANSACTION, nullptr, {&Samplers::always()});
   }
-
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY, credentialsProvider()->getCredentials().accessKey());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, resourceNamespace());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, getGroupName());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, message_id);
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
+                    credentialsProvider()->getCredentials().accessKey());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message_id);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
   switch (resolution) {
     case TransactionState::COMMIT:
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION, "commit");
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
       break;
     case TransactionState::ROLLBACK:
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION, "rollback");
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "rollback");
       break;
   }
 
diff --git a/src/main/cpp/scheduler/include/Scheduler.h b/src/main/cpp/scheduler/include/Scheduler.h
index c280ea8..65feabb 100644
--- a/src/main/cpp/scheduler/include/Scheduler.h
+++ b/src/main/cpp/scheduler/include/Scheduler.h
@@ -19,8 +19,8 @@
 #include <chrono>
 #include <cstdint>
 #include <functional>
-#include <string>
 #include <memory>
+#include <string>
 
 #include "rocketmq/RocketMQ.h"
 
diff --git a/src/main/cpp/scheduler/include/SchedulerImpl.h b/src/main/cpp/scheduler/include/SchedulerImpl.h
index bcd63d9..00066eb 100644
--- a/src/main/cpp/scheduler/include/SchedulerImpl.h
+++ b/src/main/cpp/scheduler/include/SchedulerImpl.h
@@ -16,7 +16,6 @@
  */
 #pragma once
 #include <atomic>
-#include <bits/c++config.h>
 #include <chrono>
 #include <cstdint>
 #include <functional>
diff --git a/src/main/cpp/rocketmq/BUILD.bazel b/src/main/cpp/tracing/BUILD.bazel
similarity index 73%
copy from src/main/cpp/rocketmq/BUILD.bazel
copy to src/main/cpp/tracing/BUILD.bazel
index 20cfe8c..0fdb6b0 100644
--- a/src/main/cpp/rocketmq/BUILD.bazel
+++ b/src/main/cpp/tracing/BUILD.bazel
@@ -15,19 +15,15 @@
 # limitations under the License.
 #
 load("@rules_cc//cc:defs.bzl", "cc_library")
-
-package(default_visibility = ["//visibility:public"])
-
 cc_library(
-    name = "rocketmq_library",
+    name = "tracing_utility",
     hdrs = glob(["include/*.h"]),
     srcs = glob(["*.cpp"]),
-    strip_include_prefix = "//src/main/cpp/rocketmq/include",
+    strip_include_prefix = "//src/main/cpp/tracing/include",
     deps = [
+        "//api:rocketmq_interface",
         "//src/main/cpp/client:client_library",
-        "//src/main/cpp/tracing/exporters:otlp_exporter",
-        "//src/main/cpp/log:log_library",
-        "//src/main/cpp/admin:admin_server_library",
-        "@com_google_absl//absl/types:optional",
+        "@io_opencensus_cpp//opencensus/trace",
     ],
+    visibility = ["//visibility:public"],
 )
\ No newline at end of file
diff --git a/src/main/cpp/tracing/TracingUtility.cpp b/src/main/cpp/tracing/TracingUtility.cpp
new file mode 100644
index 0000000..25805db
--- /dev/null
+++ b/src/main/cpp/tracing/TracingUtility.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TracingUtility.h"
+#include "MixAll.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void TracingUtility::addUniversalSpanAttributes(const MQMessage& message, ClientConfig& client_config,
+                                                opencensus::trace::Span& span) {
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES, message.getBody().length());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.getTags());
+  switch (message.messageType()) {
+    case MessageType::FIFO:
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE);
+      break;
+    case MessageType::DELAY:
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE);
+      break;
+    case MessageType::TRANSACTION:
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE);
+      break;
+    default:
+      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+                        MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE);
+      break;
+  }
+  std::chrono::system_clock::time_point timestamp = message.deliveryTimestamp();
+  auto duration = absl::FromChrono(timestamp.time_since_epoch());
+  int64_t timestamp_millis = absl::ToInt64Milliseconds(duration);
+  if (timestamp_millis > 0) {
+    span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP, timestamp_millis);
+  }
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM,
+                    MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, message.getTopic());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND,
+                    MixAll::SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION,
+                    MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION);
+  //   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_URL, "abc")
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, client_config.resourceNamespace());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID, client_config.clientId());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, client_config.getGroupName());
+
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
+                    client_config.credentialsProvider()->getCredentials().accessKey());
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, client_config.resourceNamespace());
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index 836ceff..78aaa26 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -153,8 +153,8 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
     case TraceMode::Develop: {
       {
         for (const auto& span : spans) {
-          SPDLOG_INFO("{} --> {}: {}", absl::FormatTime(span.start_time()), absl::FormatTime(span.end_time()),
-                      span.name().data());
+          SPDLOG_INFO("trace span {} --> {}: {}", absl::FormatTime(span.start_time()),
+                      absl::FormatTime(span.end_time()), span.name().data());
           for (const auto& event : span.annotations().events()) {
             for (const auto& attr : event.event().attributes()) {
               switch (attr.second.type()) {
@@ -173,6 +173,21 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
               }
             }
           }
+          SPDLOG_INFO("Attributes size={}", span.attributes().size());
+          for (const auto& attribute : span.attributes()) {
+            switch (attribute.second.type()) {
+              case opencensus::trace::AttributeValueRef::Type::kString:
+                SPDLOG_INFO("Span attribute: {} --> {}", attribute.first, attribute.second.string_value());
+                break;
+                break;
+              case opencensus::trace::AttributeValueRef::Type::kBool:
+                SPDLOG_INFO("Span attribute: {} --> {}", attribute.first, attribute.second.bool_value());
+                break;
+              case opencensus::trace::AttributeValueRef::Type::kInt:
+                SPDLOG_INFO("Span attribute: {} --> {}", attribute.first, attribute.second.int_value());
+                break;
+            }
+          }
         }
       }
       return;
diff --git a/src/main/cpp/scheduler/include/Scheduler.h b/src/main/cpp/tracing/include/TracingUtility.h
similarity index 57%
copy from src/main/cpp/scheduler/include/Scheduler.h
copy to src/main/cpp/tracing/include/TracingUtility.h
index c280ea8..f974780 100644
--- a/src/main/cpp/scheduler/include/Scheduler.h
+++ b/src/main/cpp/tracing/include/TracingUtility.h
@@ -16,31 +16,16 @@
  */
 #pragma once
 
-#include <chrono>
-#include <cstdint>
-#include <functional>
-#include <string>
-#include <memory>
+#include "opencensus/trace/span.h"
 
-#include "rocketmq/RocketMQ.h"
+#include "ClientConfig.h"
+#include "rocketmq/MQMessage.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-class Scheduler {
+class TracingUtility {
 public:
-  virtual ~Scheduler() = default;
-
-  virtual void start() = 0;
-
-  virtual void shutdown() = 0;
-
-  virtual std::uint32_t schedule(const std::function<void(void)>& functor, const std::string& task_name,
-                                 std::chrono::milliseconds delay, std::chrono::milliseconds interval) = 0;
-
-  virtual void cancel(std::uint32_t task_id) = 0;
+  static void addUniversalSpanAttributes(const MQMessage& message, ClientConfig&, opencensus::trace::Span& span);
 };
 
-using SchedulerPtr = std::weak_ptr<Scheduler>;
-using SchedulerSharedPtr = std::shared_ptr<Scheduler>;
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/src/test/cpp/ut/scheduler/SchedulerTest.cpp b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
index 741d831..87b2753 100644
--- a/src/test/cpp/ut/scheduler/SchedulerTest.cpp
+++ b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
@@ -29,7 +29,7 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class SchedulerTest : public testing::Test {
 public:
-  SchedulerTest() : scheduler(std::make_shared<SchedulerImpl>()) {  
+  SchedulerTest() : scheduler(std::make_shared<SchedulerImpl>()) {
   }
 
   void SetUp() override {
@@ -41,7 +41,7 @@ public:
   }
 
 protected:
-    SchedulerSharedPtr scheduler;
+  SchedulerSharedPtr scheduler;
 };
 
 TEST_F(SchedulerTest, testSingleShot) {
@@ -75,7 +75,8 @@ TEST_F(SchedulerTest, testCancel) {
     callback_fire_count++;
   };
 
-  std::uint32_t task_id = scheduler->schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
+  std::uint32_t task_id =
+      scheduler->schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
   scheduler->cancel(task_id);
   std::this_thread::sleep_for(std::chrono::seconds(2));
   ASSERT_EQ(0, callback_fire_count);