You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 12:27:58 UTC

[rocketmq-client-cpp] branch main updated: Add 'message' into the arguments of transaction related method (#384)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c5df12  Add 'message' into the arguments of transaction related method (#384)
7c5df12 is described below

commit 7c5df129da587787874916e4f20ee7b58036e34c
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 20:27:50 2021 +0800

    Add 'message' into the arguments of transaction related method (#384)
---
 src/main/cpp/base/MixAll.cpp                    |  1 -
 src/main/cpp/base/include/MixAll.h              |  1 -
 src/main/cpp/rocketmq/ProducerImpl.cpp          | 30 ++++++++++++-------------
 src/main/cpp/rocketmq/TransactionImpl.cpp       |  6 ++---
 src/main/cpp/rocketmq/include/ProducerImpl.h    | 10 ++++-----
 src/main/cpp/rocketmq/include/TransactionImpl.h |  7 +++---
 6 files changed, 25 insertions(+), 30 deletions(-)

diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index 4691e69..e919dac 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -76,7 +76,6 @@ const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
 const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
 
 const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
 
 // Span attributes follows to the opentelemetry specification, refers to:
 // https://github.com/open-telemetry/opentelemetry-specification
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 30ff613..5a45cd1 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -71,7 +71,6 @@ public:
   static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
 
   static const char* SPAN_NAME_END_TRANSACTION;
-  static const char* SPAN_NAME_PULL_MESSAGE;
 
   // RocketMQ span attribute name list
   static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION;
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index df3fb75..ea54b75 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -375,12 +375,11 @@ void ProducerImpl::send0(const MQMessage& message, SendCallback* callback, std::
   sendImpl(retry_callback);
 }
 
-bool ProducerImpl::endTransaction0(const std::string& target, const std::string& message_id,
-                                   const std::string& transaction_id, TransactionState resolution,
-                                   std::string trace_context) {
+bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& message,
+                                   const std::string& transaction_id, TransactionState resolution) {
 
   EndTransactionRequest request;
-  request.set_message_id(message_id);
+  request.set_message_id(message.getMsgId());
   request.set_transaction_id(transaction_id);
   request.mutable_group()->set_name(group_name_);
   request.mutable_group()->set_resource_namespace(resource_namespace_);
@@ -401,7 +400,8 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
   bool completed = false;
   bool success = false;
   // Trace transactional message
-  opencensus::trace::SpanContext span_context = opencensus::trace::propagation::FromTraceParentHeader(trace_context);
+  opencensus::trace::SpanContext span_context =
+      opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
   auto span = opencensus::trace::Span::BlankSpan();
   if (span_context.IsValid()) {
     span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_END_TRANSACTION, span_context,
@@ -413,7 +413,7 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
                     credentialsProvider()->getCredentials().accessKey());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
   span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
-  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message_id);
+  span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
   switch (resolution) {
     case TransactionState::COMMIT:
       span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
@@ -508,19 +508,17 @@ std::unique_ptr<TransactionImpl> ProducerImpl::prepare(MQMessage& message, std::
     return nullptr;
   }
 
-  return std::unique_ptr<TransactionImpl>(new TransactionImpl(
-      send_result.getMsgId(), send_result.getTransactionId(), send_result.getMessageQueue().serviceAddress(),
-      send_result.traceContext(), ProducerImpl::shared_from_this()));
+  return absl::make_unique<TransactionImpl>(message, send_result.getTransactionId(),
+                                            send_result.getMessageQueue().serviceAddress(), send_result.traceContext(),
+                                            ProducerImpl::shared_from_this());
 }
 
-bool ProducerImpl::commit(const std::string& message_id, const std::string& transaction_id,
-                          const std::string& trace_context, const std::string& target) {
-  return endTransaction0(target, message_id, transaction_id, TransactionState::COMMIT, trace_context);
+bool ProducerImpl::commit(const MQMessage& message, const std::string& transaction_id, const std::string& target) {
+  return endTransaction0(target, message, transaction_id, TransactionState::COMMIT);
 }
 
-bool ProducerImpl::rollback(const std::string& message_id, const std::string& transaction_id,
-                            const std::string& trace_context, const std::string& target) {
-  return endTransaction0(target, message_id, transaction_id, TransactionState::ROLLBACK, trace_context);
+bool ProducerImpl::rollback(const MQMessage& message, const std::string& transaction_id, const std::string& target) {
+  return endTransaction0(target, message, transaction_id, TransactionState::ROLLBACK);
 }
 
 void ProducerImpl::asyncPublishInfo(const std::string& topic,
@@ -624,7 +622,7 @@ void ProducerImpl::resolveOrphanedTransactionalMessage(const std::string& transa
   if (transaction_state_checker_) {
     TransactionState state = transaction_state_checker_->checkLocalTransactionState(message);
     const std::string& target_host = MessageAccessor::targetEndpoint(message);
-    endTransaction0(target_host, message.getMsgId(), transaction_id, state, message.traceContext());
+    endTransaction0(target_host, message, transaction_id, state);
   } else {
     SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr");
   }
diff --git a/src/main/cpp/rocketmq/TransactionImpl.cpp b/src/main/cpp/rocketmq/TransactionImpl.cpp
index 21610aa..11f9ea3 100644
--- a/src/main/cpp/rocketmq/TransactionImpl.cpp
+++ b/src/main/cpp/rocketmq/TransactionImpl.cpp
@@ -26,7 +26,7 @@ bool TransactionImpl::commit() {
     return false;
   }
 
-  return producer->commit(message_id_, transaction_id_, trace_context_, endpoint_);
+  return producer->commit(message_, transaction_id_, endpoint_);
 }
 
 bool TransactionImpl::rollback() {
@@ -34,11 +34,11 @@ bool TransactionImpl::rollback() {
   if (!producer) {
     return false;
   }
-  return producer->rollback(message_id_, transaction_id_, trace_context_, endpoint_);
+  return producer->rollback(message_, transaction_id_, endpoint_);
 }
 
 std::string TransactionImpl::messageId() const {
-  return message_id_;
+  return message_.getMsgId();
 }
 
 std::string TransactionImpl::transactionId() const {
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index e28f8a3..a2172ce 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -62,11 +62,9 @@ public:
 
   std::unique_ptr<TransactionImpl> prepare(MQMessage& message, std::error_code& ec);
 
-  bool commit(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context,
-              const std::string& target);
+  bool commit(const MQMessage& message, const std::string& transaction_id, const std::string& target);
 
-  bool rollback(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context,
-                const std::string& target);
+  bool rollback(const MQMessage& message, const std::string& transaction_id, const std::string& target);
 
   /**
    * Check if the RPC client for the target host is isolated or not
@@ -156,8 +154,8 @@ private:
 
   void send0(const MQMessage& message, SendCallback* callback, std::vector<MQMessageQueue> list, int max_attempt_times);
 
-  bool endTransaction0(const std::string& target, const std::string& message_id, const std::string& transaction_id,
-                       TransactionState resolution, std::string trace_context);
+  bool endTransaction0(const std::string& target, const MQMessage& message, const std::string& transaction_id,
+                       TransactionState resolution);
 
   void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) LOCKS_EXCLUDED(isolated_endpoints_mtx_);
 
diff --git a/src/main/cpp/rocketmq/include/TransactionImpl.h b/src/main/cpp/rocketmq/include/TransactionImpl.h
index 4b721ed..844c827 100644
--- a/src/main/cpp/rocketmq/include/TransactionImpl.h
+++ b/src/main/cpp/rocketmq/include/TransactionImpl.h
@@ -19,6 +19,7 @@
 #include <memory>
 #include <string>
 
+#include "rocketmq/MQMessage.h"
 #include "rocketmq/Transaction.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
@@ -27,9 +28,9 @@ class ProducerImpl;
 
 class TransactionImpl : public Transaction {
 public:
-  TransactionImpl(std::string message_id, std::string transaction_id, std::string endpoint, std::string trace_context,
+  TransactionImpl(MQMessage message, std::string transaction_id, std::string endpoint, std::string trace_context,
                   const std::shared_ptr<ProducerImpl>& producer)
-      : message_id_(std::move(message_id)), transaction_id_(std::move(transaction_id)), endpoint_(std::move(endpoint)),
+      : message_(std::move(message)), transaction_id_(std::move(transaction_id)), endpoint_(std::move(endpoint)),
         trace_context_(std::move(trace_context)), producer_(producer) {
   }
 
@@ -44,7 +45,7 @@ public:
   std::string transactionId() const override;
 
 private:
-  std::string message_id_;
+  MQMessage message_;
   std::string transaction_id_;
   std::string endpoint_;
   std::string trace_context_;