You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/21 10:42:00 UTC
[rocketmq-client-cpp] branch main updated: Update rocketmq trace
attributes key name (#376)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 9555dae Update rocketmq trace attributes key name (#376)
9555dae is described below
commit 9555dae37c08d9cf1139da27cb6c1aba9f3c6e8f
Author: aaron ai <ya...@gmail.com>
AuthorDate: Thu Oct 21 18:41:13 2021 +0800
Update rocketmq trace attributes key name (#376)
---
src/main/cpp/base/MixAll.cpp | 54 ++++++++++++-----
src/main/cpp/base/include/MixAll.h | 51 +++++++++++-----
src/main/cpp/client/ClientManagerImpl.cpp | 4 +-
src/main/cpp/rocketmq/BUILD.bazel | 1 +
.../cpp/rocketmq/ConsumeStandardMessageService.cpp | 42 ++++++-------
src/main/cpp/rocketmq/ProducerImpl.cpp | 41 +++++++------
src/main/cpp/scheduler/include/Scheduler.h | 2 +-
src/main/cpp/scheduler/include/SchedulerImpl.h | 1 -
src/main/cpp/{rocketmq => tracing}/BUILD.bazel | 14 ++---
src/main/cpp/tracing/TracingUtility.cpp | 70 ++++++++++++++++++++++
src/main/cpp/tracing/exporters/OtlpExporter.cpp | 19 +++++-
.../include/TracingUtility.h} | 27 ++-------
src/test/cpp/ut/scheduler/SchedulerTest.cpp | 7 ++-
13 files changed, 224 insertions(+), 109 deletions(-)
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index 9a86e33..7006410 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -73,21 +73,45 @@ const char* MixAll::SPAN_NAME_AWAIT_CONSUMPTION = "AwaitingConsumption";
const char* MixAll::SPAN_NAME_CONSUME_MESSAGE = "ConsumeMessage";
const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
-// Span attribute name list
-const char* MixAll::SPAN_ATTRIBUTE_ACCESS_KEY = "ak";
-const char* MixAll::SPAN_ATTRIBUTE_ARN = "setResourceNamespace";
-const char* MixAll::SPAN_ATTRIBUTE_KEYS = "keys";
-const char* MixAll::SPAN_ATTRIBUTE_MESSAGE_TYPE = "msgType";
-const char* MixAll::SPAN_ATTRIBUTE_DELIVERY_TIMESTAMP = "deliveryTimestamp";
-const char* MixAll::SPAN_ATTRIBUTE_TOPIC = "topic";
-const char* MixAll::SPAN_ATTRIBUTE_GROUP = "group";
-const char* MixAll::SPAN_ATTRIBUTE_MESSAGE_ID = "msgId";
-const char* MixAll::SPAN_ATTRIBUTE_TAG = "tags";
-const char* MixAll::SPAN_ATTRIBUTE_HOST = "host";
-const char* MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME = "attempt";
-const char* MixAll::SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION = "commitAction";
-const char* MixAll::SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP = "availableTimestamp";
-const char* MixAll::SPAN_ATTRIBUTE_BATCH_SIZE = "batchSize";
+// Span attributes follows to the opentelemetry specification, refers to:
+// https://github.com/open-telemetry/opentelemetry-specification
+
+// RocketMQ span attribute name list
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE = "messaging.rocketmq.namespace";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.tag";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.keys";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID = "messaging.rocketmq.client_id";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE = "messaging.rocketmq.message_type";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP = "messaging.rocketmq.client_group";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT = "messaging.rocketmq.attempt";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE = "messaging.rocketmq.batch_size";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP = "messaging.rocketmq.delivery_timestamp";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP = "messaging.rocketmq.available_timestamp";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY = "messaging.rocketmq.access_key";
+
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM = "rocketmq";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND = "topic";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL = "RMQ-gRPC";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION = "v1";
+
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE = "normal";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE = "fifo";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE = "delay";
+const char* MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE = "transaction";
+
+// Messaging span attribute name list
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM = "messaging.system";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION = "messaging.destination";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND = "messaging.destination_kind";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL = "messaging.protocol";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION = "messaging.protocol_version";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_URL = "messaging.url";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID = "messaging.message_id";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION = "messaging.operation";
+
+const char* MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME = "host.name";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction";
// Span annotation
const char* MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption";
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 1a84c66..9a82b30 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -68,21 +68,42 @@ public:
static const char* SPAN_NAME_CONSUME_MESSAGE;
static const char* SPAN_NAME_PULL_MESSAGE;
- // Tracing attribute name list
- static const char* SPAN_ATTRIBUTE_ACCESS_KEY;
- static const char* SPAN_ATTRIBUTE_ARN;
- static const char* SPAN_ATTRIBUTE_TOPIC;
- static const char* SPAN_ATTRIBUTE_GROUP;
- static const char* SPAN_ATTRIBUTE_MESSAGE_ID;
- static const char* SPAN_ATTRIBUTE_TAG;
- static const char* SPAN_ATTRIBUTE_KEYS;
- static const char* SPAN_ATTRIBUTE_HOST;
- static const char* SPAN_ATTRIBUTE_MESSAGE_TYPE;
- static const char* SPAN_ATTRIBUTE_ATTEMPT_TIME;
- static const char* SPAN_ATTRIBUTE_DELIVERY_TIMESTAMP;
- static const char* SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION;
- static const char* SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP;
- static const char* SPAN_ATTRIBUTE_BATCH_SIZE;
+ // RocketMQ span attribute name list
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP;
+ static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY;
+
+ static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM;
+ static const char* SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND;
+ static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL;
+ static const char* SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION;
+
+ static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE;
+ static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE;
+ static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE;
+ static const char* SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE;
+
+ // Messaging attribute name list
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_URL;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_ID;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES;
+ static const char* SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION;
+
+ static const char* SPAN_ATTRIBUTE_KEY_HOST_NAME;
+ static const char* SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION;
// Tracing annotation
static const char* SPAN_ANNOTATION_AWAIT_CONSUMPTION;
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 227623d..db77dc8 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -113,7 +113,7 @@ void ClientManagerImpl::start() {
state_.store(State::STARTING, std::memory_order_relaxed);
callback_thread_pool_->start();
-
+
scheduler_->start();
std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();
@@ -169,7 +169,7 @@ void ClientManagerImpl::shutdown() {
if (stats_task_id_) {
scheduler_->cancel(stats_task_id_);
}
-
+
scheduler_->shutdown();
{
diff --git a/src/main/cpp/rocketmq/BUILD.bazel b/src/main/cpp/rocketmq/BUILD.bazel
index 20cfe8c..6d6dd72 100644
--- a/src/main/cpp/rocketmq/BUILD.bazel
+++ b/src/main/cpp/rocketmq/BUILD.bazel
@@ -26,6 +26,7 @@ cc_library(
deps = [
"//src/main/cpp/client:client_library",
"//src/main/cpp/tracing/exporters:otlp_exporter",
+ "//src/main/cpp/tracing:tracing_utility",
"//src/main/cpp/log:log_library",
"//src/main/cpp/admin:admin_server_library",
"@com_google_absl//absl/types:optional",
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index 4376afd..b13b34a 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -146,20 +146,20 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_AWAIT_CONSUMPTION, nullptr, {&Samplers::always()});
}
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
consumer->credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, consumer->resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TOPIC, msg.getTopic());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, msg.getMsgId());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, consumer->getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TAG, msg.getTags());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
const auto& keys = msg.getKeys();
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEYS,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME, msg.getDeliveryAttempt());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
span.AddAnnotation(
MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
@@ -180,21 +180,21 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
} else {
span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_CONSUME_MESSAGE);
}
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
consumer->credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, consumer->resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TOPIC, msg.getTopic());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, msg.getMsgId());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, consumer->getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TAG, msg.getTags());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, consumer->resourceNamespace());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, msg.getTopic());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, msg.getMsgId());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, consumer->getGroupName());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, msg.getTags());
const auto& keys = msg.getKeys();
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEYS,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME, msg.getDeliveryAttempt());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_AVAILABLE_TIMESTAMP,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_BATCH_SIZE, msgs.size());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
spans.emplace_back(std::move(span));
}
}
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index 034c920..5b51fcd 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -37,6 +37,7 @@
#include "SendCallbacks.h"
#include "SendMessageContext.h"
#include "Signature.h"
+#include "TracingUtility.h"
#include "TransactionImpl.h"
#include "UniqueIdGenerator.h"
#include "UtilAll.h"
@@ -326,24 +327,26 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
} else {
span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_SEND_MESSAGE, nullptr, {&Samplers::always()});
}
-
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY, credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TOPIC, message.getTopic());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, message.getMsgId());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TAG, message.getTags());
+ TracingUtility::addUniversalSpanAttributes(message, *this, span);
+
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
+ credentialsProvider()->getCredentials().accessKey());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, message.getTopic());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.getTags());
const auto& keys = callback->message().getKeys();
if (!keys.empty()) {
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEYS,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
}
// Note: attempt-time is 0-based
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ATTEMPT_TIME, 1 + callback->attemptTime());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + callback->attemptTime());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
if (message.deliveryTimestamp() != absl::ToChronoTime(absl::UnixEpoch())) {
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_DELIVERY_TIMESTAMP,
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP,
absl::FormatTime(absl::FromChrono(message.deliveryTimestamp())));
}
callback->message().traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context()));
@@ -413,18 +416,18 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
} else {
span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_END_TRANSACTION, nullptr, {&Samplers::always()});
}
-
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ACCESS_KEY, credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_ARN, resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_GROUP, getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_MESSAGE_ID, message_id);
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_HOST, UtilAll::hostname());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
+ credentialsProvider()->getCredentials().accessKey());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message_id);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_HOST_NAME, UtilAll::hostname());
switch (resolution) {
case TransactionState::COMMIT:
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION, "commit");
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
break;
case TransactionState::ROLLBACK:
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_TRANSACTION_RESOLUTION, "rollback");
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "rollback");
break;
}
diff --git a/src/main/cpp/scheduler/include/Scheduler.h b/src/main/cpp/scheduler/include/Scheduler.h
index c280ea8..65feabb 100644
--- a/src/main/cpp/scheduler/include/Scheduler.h
+++ b/src/main/cpp/scheduler/include/Scheduler.h
@@ -19,8 +19,8 @@
#include <chrono>
#include <cstdint>
#include <functional>
-#include <string>
#include <memory>
+#include <string>
#include "rocketmq/RocketMQ.h"
diff --git a/src/main/cpp/scheduler/include/SchedulerImpl.h b/src/main/cpp/scheduler/include/SchedulerImpl.h
index bcd63d9..00066eb 100644
--- a/src/main/cpp/scheduler/include/SchedulerImpl.h
+++ b/src/main/cpp/scheduler/include/SchedulerImpl.h
@@ -16,7 +16,6 @@
*/
#pragma once
#include <atomic>
-#include <bits/c++config.h>
#include <chrono>
#include <cstdint>
#include <functional>
diff --git a/src/main/cpp/rocketmq/BUILD.bazel b/src/main/cpp/tracing/BUILD.bazel
similarity index 73%
copy from src/main/cpp/rocketmq/BUILD.bazel
copy to src/main/cpp/tracing/BUILD.bazel
index 20cfe8c..0fdb6b0 100644
--- a/src/main/cpp/rocketmq/BUILD.bazel
+++ b/src/main/cpp/tracing/BUILD.bazel
@@ -15,19 +15,15 @@
# limitations under the License.
#
load("@rules_cc//cc:defs.bzl", "cc_library")
-
-package(default_visibility = ["//visibility:public"])
-
cc_library(
- name = "rocketmq_library",
+ name = "tracing_utility",
hdrs = glob(["include/*.h"]),
srcs = glob(["*.cpp"]),
- strip_include_prefix = "//src/main/cpp/rocketmq/include",
+ strip_include_prefix = "//src/main/cpp/tracing/include",
deps = [
+ "//api:rocketmq_interface",
"//src/main/cpp/client:client_library",
- "//src/main/cpp/tracing/exporters:otlp_exporter",
- "//src/main/cpp/log:log_library",
- "//src/main/cpp/admin:admin_server_library",
- "@com_google_absl//absl/types:optional",
+ "@io_opencensus_cpp//opencensus/trace",
],
+ visibility = ["//visibility:public"],
)
\ No newline at end of file
diff --git a/src/main/cpp/tracing/TracingUtility.cpp b/src/main/cpp/tracing/TracingUtility.cpp
new file mode 100644
index 0000000..25805db
--- /dev/null
+++ b/src/main/cpp/tracing/TracingUtility.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TracingUtility.h"
+#include "MixAll.h"
+#include "rocketmq/CredentialsProvider.h"
+#include "spdlog/spdlog.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+void TracingUtility::addUniversalSpanAttributes(const MQMessage& message, ClientConfig& client_config,
+ opencensus::trace::Span& span) {
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES, message.getBody().length());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.getTags());
+ switch (message.messageType()) {
+ case MessageType::FIFO:
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE);
+ break;
+ case MessageType::DELAY:
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE);
+ break;
+ case MessageType::TRANSACTION:
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE);
+ break;
+ default:
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE);
+ break;
+ }
+ std::chrono::system_clock::time_point timestamp = message.deliveryTimestamp();
+ auto duration = absl::FromChrono(timestamp.time_since_epoch());
+ int64_t timestamp_millis = absl::ToInt64Milliseconds(duration);
+ if (timestamp_millis > 0) {
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP, timestamp_millis);
+ }
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION, message.getTopic());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND,
+ MixAll::SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION);
+ // span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_URL, "abc")
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, client_config.resourceNamespace());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID, client_config.clientId());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, client_config.getGroupName());
+
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
+ client_config.credentialsProvider()->getCredentials().accessKey());
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, client_config.resourceNamespace());
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index 836ceff..78aaa26 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -153,8 +153,8 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
case TraceMode::Develop: {
{
for (const auto& span : spans) {
- SPDLOG_INFO("{} --> {}: {}", absl::FormatTime(span.start_time()), absl::FormatTime(span.end_time()),
- span.name().data());
+ SPDLOG_INFO("trace span {} --> {}: {}", absl::FormatTime(span.start_time()),
+ absl::FormatTime(span.end_time()), span.name().data());
for (const auto& event : span.annotations().events()) {
for (const auto& attr : event.event().attributes()) {
switch (attr.second.type()) {
@@ -173,6 +173,21 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
}
}
}
+ SPDLOG_INFO("Attributes size={}", span.attributes().size());
+ for (const auto& attribute : span.attributes()) {
+ switch (attribute.second.type()) {
+ case opencensus::trace::AttributeValueRef::Type::kString:
+ SPDLOG_INFO("Span attribute: {} --> {}", attribute.first, attribute.second.string_value());
+ break;
+ break;
+ case opencensus::trace::AttributeValueRef::Type::kBool:
+ SPDLOG_INFO("Span attribute: {} --> {}", attribute.first, attribute.second.bool_value());
+ break;
+ case opencensus::trace::AttributeValueRef::Type::kInt:
+ SPDLOG_INFO("Span attribute: {} --> {}", attribute.first, attribute.second.int_value());
+ break;
+ }
+ }
}
}
return;
diff --git a/src/main/cpp/scheduler/include/Scheduler.h b/src/main/cpp/tracing/include/TracingUtility.h
similarity index 57%
copy from src/main/cpp/scheduler/include/Scheduler.h
copy to src/main/cpp/tracing/include/TracingUtility.h
index c280ea8..f974780 100644
--- a/src/main/cpp/scheduler/include/Scheduler.h
+++ b/src/main/cpp/tracing/include/TracingUtility.h
@@ -16,31 +16,16 @@
*/
#pragma once
-#include <chrono>
-#include <cstdint>
-#include <functional>
-#include <string>
-#include <memory>
+#include "opencensus/trace/span.h"
-#include "rocketmq/RocketMQ.h"
+#include "ClientConfig.h"
+#include "rocketmq/MQMessage.h"
ROCKETMQ_NAMESPACE_BEGIN
-class Scheduler {
+class TracingUtility {
public:
- virtual ~Scheduler() = default;
-
- virtual void start() = 0;
-
- virtual void shutdown() = 0;
-
- virtual std::uint32_t schedule(const std::function<void(void)>& functor, const std::string& task_name,
- std::chrono::milliseconds delay, std::chrono::milliseconds interval) = 0;
-
- virtual void cancel(std::uint32_t task_id) = 0;
+ static void addUniversalSpanAttributes(const MQMessage& message, ClientConfig&, opencensus::trace::Span& span);
};
-using SchedulerPtr = std::weak_ptr<Scheduler>;
-using SchedulerSharedPtr = std::shared_ptr<Scheduler>;
-
-ROCKETMQ_NAMESPACE_END
\ No newline at end of file
+ROCKETMQ_NAMESPACE_END
diff --git a/src/test/cpp/ut/scheduler/SchedulerTest.cpp b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
index 741d831..87b2753 100644
--- a/src/test/cpp/ut/scheduler/SchedulerTest.cpp
+++ b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
@@ -29,7 +29,7 @@ ROCKETMQ_NAMESPACE_BEGIN
class SchedulerTest : public testing::Test {
public:
- SchedulerTest() : scheduler(std::make_shared<SchedulerImpl>()) {
+ SchedulerTest() : scheduler(std::make_shared<SchedulerImpl>()) {
}
void SetUp() override {
@@ -41,7 +41,7 @@ public:
}
protected:
- SchedulerSharedPtr scheduler;
+ SchedulerSharedPtr scheduler;
};
TEST_F(SchedulerTest, testSingleShot) {
@@ -75,7 +75,8 @@ TEST_F(SchedulerTest, testCancel) {
callback_fire_count++;
};
- std::uint32_t task_id = scheduler->schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
+ std::uint32_t task_id =
+ scheduler->schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
scheduler->cancel(task_id);
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(0, callback_fire_count);