You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/28 02:56:38 UTC

[rocketmq-client-cpp] branch main updated: Implement trace for transaction message (#385)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9959686  Implement trace for transaction message (#385)
9959686 is described below

commit 995968624540fa46aceb924bcb2d7ca297985853
Author: aaron ai <ya...@gmail.com>
AuthorDate: Thu Oct 28 10:56:32 2021 +0800

    Implement trace for transaction message (#385)
---
 example/rocketmq/ExampleTransactionProducer.cpp |  2 ++
 src/main/cpp/base/MixAll.cpp                    |  2 --
 src/main/cpp/base/include/MixAll.h              |  2 --
 src/main/cpp/rocketmq/ProducerImpl.cpp          | 27 ++++++++++---------------
 4 files changed, 13 insertions(+), 20 deletions(-)

diff --git a/example/rocketmq/ExampleTransactionProducer.cpp b/example/rocketmq/ExampleTransactionProducer.cpp
index 1fe5b87..1041b92 100644
--- a/example/rocketmq/ExampleTransactionProducer.cpp
+++ b/example/rocketmq/ExampleTransactionProducer.cpp
@@ -44,6 +44,8 @@ int main(int argc, char* argv[]) {
 
   transaction->commit();
 
+  std::this_thread::sleep_for(std::chrono::minutes(30));
+
   producer.shutdown();
 
   return EXIT_SUCCESS;
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index e919dac..0bf273a 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -75,8 +75,6 @@ const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_HOST_NAME = "host.name";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
 
-const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-
 // Span attributes follows to the opentelemetry specification, refers to:
 // https://github.com/open-telemetry/opentelemetry-specification
 
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 5a45cd1..ae272dc 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -70,8 +70,6 @@ public:
   static const char* TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME;
   static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
 
-  static const char* SPAN_NAME_END_TRANSACTION;
-
   // RocketMQ span attribute name list
   static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION;
   static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE;
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index ea54b75..38bc99e 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -373,6 +373,8 @@ void ProducerImpl::send0(const MQMessage& message, SendCallback* callback, std::
   auto retry_callback =
       new RetrySendCallback(shared_from_this(), message, max_attempt_times, callback, std::move(list));
   sendImpl(retry_callback);
+  const_cast<MQMessage&>(message).traceContext(
+      opencensus::trace::propagation::ToTraceParentHeader(retry_callback->span().context()));
 }
 
 bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& message,
@@ -403,25 +405,18 @@ bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& m
   opencensus::trace::SpanContext span_context =
       opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
   auto span = opencensus::trace::Span::BlankSpan();
+  std::string trace_operation_name = TransactionState::COMMIT == resolution
+                                         ? MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION
+                                         : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION;
+  std::string span_name = resourceNamespace() + "/" + message.getTopic() + " " + trace_operation_name;
   if (span_context.IsValid()) {
-    span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_END_TRANSACTION, span_context,
-                                                              {&Samplers::always()});
+    span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {&Samplers::always()});
   } else {
-    span = opencensus::trace::Span::StartSpan(MixAll::SPAN_NAME_END_TRANSACTION, nullptr, {&Samplers::always()});
-  }
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY,
-                    credentialsProvider()->getCredentials().accessKey());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
-  switch (resolution) {
-    case TransactionState::COMMIT:
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
-      break;
-    case TransactionState::ROLLBACK:
-      span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "rollback");
-      break;
+    span = opencensus::trace::Span::StartSpan(span_name, nullptr, {&Samplers::always()});
   }
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name);
+  TracingUtility::addUniversalSpanAttributes(message, *this, span);
 
   absl::Mutex mtx;
   absl::CondVar cv;