You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 11:43:21 UTC
[rocketmq-client-cpp] branch main updated: Fill
'messaging.rocketmq.message_keys' attribute in message tracing with value
of array (#383)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new b066034 Fill 'messaging.rocketmq.message_keys' attribute in message tracing with value of array (#383)
b066034 is described below
commit b066034388747552091a8f63a951391f5fd34d3e
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 19:43:14 2021 +0800
Fill 'messaging.rocketmq.message_keys' attribute in message tracing with value of array (#383)
---
src/main/cpp/base/MixAll.cpp | 5 +++--
src/main/cpp/base/include/MixAll.h | 1 +
.../cpp/rocketmq/ConsumeFifoMessageService.cpp | 6 ------
.../cpp/rocketmq/ConsumeStandardMessageService.cpp | 6 ------
src/main/cpp/rocketmq/ProducerImpl.cpp | 5 -----
.../rocketmq/include/ConsumeFifoMessageService.h | 4 ++--
src/main/cpp/tracing/TracingUtility.cpp | 7 +++++++
src/main/cpp/tracing/exporters/OtlpExporter.cpp | 23 ++++++++++++++++++++++
8 files changed, 36 insertions(+), 21 deletions(-)
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index ba69ce4..4691e69 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -84,8 +84,8 @@ const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
// RocketMQ span attribute name list
const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION = "messaging.rocketmq.operation";
const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE = "messaging.rocketmq.namespace";
-const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.tag";
-const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.keys";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.message_tag";
+const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.message_keys";
const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID = "messaging.rocketmq.client_id";
const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE = "messaging.rocketmq.message_type";
const char* MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP = "messaging.rocketmq.client_group";
@@ -135,6 +135,7 @@ const char* MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction";
// Span annotation
const char* MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption";
+const char* MixAll::SPAN_ANNOTATION_MESSAGE_KEYS = "__message_keys";
const char* MixAll::SPAN_ANNOTATION_ATTR_START_TIME = "__start_time";
bool MixAll::validate(const MQMessage& message) {
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index d25f4b8..30ff613 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -127,6 +127,7 @@ public:
// Tracing annotation
static const char* SPAN_ANNOTATION_AWAIT_CONSUMPTION;
+ static const char* SPAN_ANNOTATION_MESSAGE_KEYS;
static const char* SPAN_ANNOTATION_ATTR_START_TIME;
template <typename Rep, typename Period>
diff --git a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
index 58ee2ec..76db7a5 100644
--- a/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeFifoMessageService.cpp
@@ -145,9 +145,6 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
- const auto& keys = message.getKeys();
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
- absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(message);
span.AddAnnotation(
@@ -173,9 +170,6 @@ void ConsumeFifoMessageService::consumeTask(const ProcessQueueWeakPtr& process_q
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
TracingUtility::addUniversalSpanAttributes(message, *consumer, span);
- const auto& keys = message.getKeys();
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
- absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, message.getDeliveryAttempt());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, message.getStoreTimestamp());
MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(message),
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index 956a4cf..e31b2ed 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -153,9 +153,6 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
- const auto& keys = msg.getKeys();
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
- absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, msg.getStoreTimestamp());
absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
span.AddAnnotation(
@@ -186,9 +183,6 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION);
TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
- const auto& keys = msg.getKeys();
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
- absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, msg.getStoreTimestamp());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index d22f6dc..df3fb75 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -329,11 +329,6 @@ void ProducerImpl::sendImpl(RetrySendCallback* callback) {
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION);
TracingUtility::addUniversalSpanAttributes(message, *this, span);
- const auto& keys = callback->message().getKeys();
- if (!keys.empty()) {
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
- absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
- }
// Note: attempt-time is 0-based
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + callback->attemptTime());
diff --git a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
index a3712d8..ea85888 100644
--- a/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
+++ b/src/main/cpp/rocketmq/include/ConsumeFifoMessageService.h
@@ -28,8 +28,8 @@ public:
/**
* @brief Entry of ConsumeMessageService
- *
- * @param process_queue
+ *
+ * @param process_queue
*/
void submitConsumeTask(const ProcessQueueWeakPtr& process_queue) override;
diff --git a/src/main/cpp/tracing/TracingUtility.cpp b/src/main/cpp/tracing/TracingUtility.cpp
index 6446e3f..d5b7be0 100644
--- a/src/main/cpp/tracing/TracingUtility.cpp
+++ b/src/main/cpp/tracing/TracingUtility.cpp
@@ -16,6 +16,7 @@
*/
#include "TracingUtility.h"
#include "MixAll.h"
+#include "absl/strings/str_join.h"
#include "rocketmq/CredentialsProvider.h"
#include "spdlog/spdlog.h"
@@ -26,6 +27,12 @@ void TracingUtility::addUniversalSpanAttributes(const MQMessage& message, Client
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES, message.getBody().length());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.getTags());
+ const std::vector<std::string>& keys = message.getKeys();
+ if (!keys.empty()) {
+ span.AddAnnotation(MixAll::SPAN_ANNOTATION_MESSAGE_KEYS,
+ {{MixAll::SPAN_ANNOTATION_MESSAGE_KEYS,
+ absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR)}});
+ }
switch (message.messageType()) {
case MessageType::FIFO:
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE,
diff --git a/src/main/cpp/tracing/exporters/OtlpExporter.cpp b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
index e1c1735..1d24de4 100644
--- a/src/main/cpp/tracing/exporters/OtlpExporter.cpp
+++ b/src/main/cpp/tracing/exporters/OtlpExporter.cpp
@@ -20,6 +20,7 @@
#include "MixAll.h"
#include "Signature.h"
#include "UtilAll.h"
+#include "absl/strings/str_split.h"
#include "fmt/format.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"
#include "opentelemetry/proto/common/v1/common.pb.h"
@@ -263,6 +264,28 @@ void OtlpExporterHandler::Export(const std::vector<::opencensus::trace::exporter
}
continue;
}
+ if (annotation.event().description() == MixAll::SPAN_ANNOTATION_MESSAGE_KEYS) {
+ for (const auto& attr : annotation.event().attributes()) {
+ if (attr.first == MixAll::SPAN_ANNOTATION_MESSAGE_KEYS) {
+ assert(attr.second.type() == opencensus::trace::AttributeValueRef::Type::kString);
+ std::string message_keys = attr.second.string_value();
+ std::vector<std::string> key_list = absl::StrSplit(message_keys, MixAll::MESSAGE_KEY_SEPARATOR);
+ auto key_kv = new common::KeyValue();
+ key_kv->set_key(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS);
+ auto key_value = new common::AnyValue();
+ auto key_array_value = new common::ArrayValue();
+ for (const auto& key : key_list) {
+ auto value = new common::AnyValue();
+ value->set_string_value(key);
+ key_array_value->mutable_values()->AddAllocated(value);
+ }
+ key_value->set_allocated_array_value(key_array_value);
+ key_kv->set_allocated_value(key_value);
+ item->mutable_attributes()->AddAllocated(key_kv);
+ }
+ }
+ continue;
+ }
auto ev = new trace::Span::Event();
ev->set_time_unix_nano(absl::ToUnixNanos(annotation.timestamp()));
auto attrs = annotation.event().attributes();