You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 05:07:00 UTC
[rocketmq-client-cpp] branch main updated: Add more attributes for
message consumption (#380)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 8be2aa7 Add more attributes for message consumption (#380)
8be2aa7 is described below
commit 8be2aa7ceda6c3fff2fa2c5591bcca293fdf4ce0
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 13:06:53 2021 +0800
Add more attributes for message consumption (#380)
---
src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
diff --git a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
index 46ef575..956a4cf 100644
--- a/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
+++ b/src/main/cpp/rocketmq/ConsumeStandardMessageService.cpp
@@ -150,13 +150,13 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
}
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION,
MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION,
+ MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION);
TracingUtility::addUniversalSpanAttributes(msg, *consumer, span);
const auto& keys = msg.getKeys();
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
- absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, msg.getStoreTimestamp());
absl::Time decoded_timestamp = MessageAccessor::decodedTimestamp(msg);
span.AddAnnotation(
MixAll::SPAN_ANNOTATION_AWAIT_CONSUMPTION,
@@ -190,8 +190,7 @@ void ConsumeStandardMessageService::consumeTask(const ProcessQueueWeakPtr& proce
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS,
absl::StrJoin(keys.begin(), keys.end(), MixAll::MESSAGE_KEY_SEPARATOR));
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, msg.getDeliveryAttempt());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP,
- absl::FormatTime(absl::FromUnixMillis(msg.getStoreTimestamp())));
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP, msg.getStoreTimestamp());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE, msgs.size());
spans.emplace_back(std::move(span));
MessageAccessor::setTraceContext(const_cast<MQMessageExt&>(msg),