You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/28 02:56:38 UTC
[rocketmq-client-cpp] branch main updated: Implement trace for
transaction message (#385)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 9959686 Implement trace for transaction message (#385)
9959686 is described below
commit 995968624540fa46aceb924bcb2d7ca297985853
Author: aaron ai <ya...@gmail.com>
AuthorDate: Thu Oct 28 10:56:32 2021 +0800
Implement trace for transaction message (#385)
---
example/rocketmq/ExampleTransactionProducer.cpp | 2 ++
src/main/cpp/base/MixAll.cpp | 2 --
src/main/cpp/base/include/MixAll.h | 2 --
src/main/cpp/rocketmq/ProducerImpl.cpp | 27 ++++++++++---------------
4 files changed, 13 insertions(+), 20 deletions(-)
diff --git a/example/rocketmq/ExampleTransactionProducer.cpp b/example/rocketmq/ExampleTransactionProducer.cpp
index 1fe5b87..1041b92 100644
--- a/example/rocketmq/ExampleTransactionProducer.cpp
+++ b/example/rocketmq/ExampleTransactionProducer.cpp
@@ -44,6 +44,8 @@ int main(int argc, char* argv[]) {
transaction->commit();
+ std::this_thread::sleep_for(std::chrono::minutes(30));
+
producer.shutdown();
return EXIT_SUCCESS;
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index e919dac..0bf273a 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -75,8 +75,6 @@ const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_HOST_NAME = "host.name";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
-const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-
// Span attributes follows to the opentelemetry specification, refers to:
// https://github.com/open-telemetry/opentelemetry-specification
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 5a45cd1..ae272dc 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -70,8 +70,6 @@ public:
static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME;
static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
- static const char* SPAN_NAME_END_TRANSACTION;
-
// RocketMQ span attribute name list
static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION;
static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE;
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index ea54b75..38bc99e 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -373,6 +373,8 @@ void ProducerImpl::send0(const MQMessage& message, SendCallback* callback, std::
auto retry_callback =
new RetrySendCallback(shared_from_this(), message, max_attempt_times, callback, std::move(list));
sendImpl(retry_callback);
+ const_cast<MQMessage&>(message).traceContext(
+ opencensus::trace::propagation::ToTraceParentHeader(retry_callback->span().context()));
}
bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& message,
@@ -403,25 +405,18 @@ bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& m
opencensus::trace::SpanContext span_context =
opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
auto span = opencensus::trace::Span::BlankSpan();
+ std::string trace_operation_name = TransactionState::COMMIT == resolution
+ ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
+ : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
+ std::string span_name = resourceNamespace() + "/" + message.getTopic() + " " + trace_operation_name;
if (span_context.IsValid()) {
- span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_END_TRANSACTION, span_context,
- {&Samplers::always()});
+ span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {&Samplers::always()});
} else {
- span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_END_TRANSACTION, nullptr, {&Samplers::always()});
- }
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
- credentialsProvider()->getCredentials().accessKey());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
- switch (resolution) {
- case TransactionState::COMMIT:
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
- break;
- case TransactionState::ROLLBACK:
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "rollback");
- break;
+ span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
}
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
+ TracingUtility::addUniversalSpanAttributes(message, *this, span);
absl::Mutex mtx;
absl::CondVar cv;