You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/25 12:27:58 UTC
[rocketmq-client-cpp] branch main updated: Add 'message' into the
arguments of transaction related method (#384)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 7c5df12 Add 'message' into the arguments of transaction related method (#384)
7c5df12 is described below
commit 7c5df129da587787874916e4f20ee7b58036e34c
Author: aaron ai <ya...@gmail.com>
AuthorDate: Mon Oct 25 20:27:50 2021 +0800
Add 'message' into the arguments of transaction related method (#384)
---
src/main/cpp/base/MixAll.cpp | 1 -
src/main/cpp/base/include/MixAll.h | 1 -
src/main/cpp/rocketmq/ProducerImpl.cpp | 30 ++++++++++++-------------
src/main/cpp/rocketmq/TransactionImpl.cpp | 6 ++---
src/main/cpp/rocketmq/include/ProducerImpl.h | 10 ++++-----
src/main/cpp/rocketmq/include/TransactionImpl.h | 7 +++---
6 files changed, 25 insertions(+), 30 deletions(-)
diff --git a/src/main/cpp/base/MixAll.cpp b/src/main/cpp/base/MixAll.cpp
index 4691e69..e919dac 100644
--- a/src/main/cpp/base/MixAll.cpp
+++ b/src/main/cpp/base/MixAll.cpp
@@ -76,7 +76,6 @@ const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_KEY_SERVICE_NAME = "service.name";
const char* MixAll::TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME = "rocketmq-client";
const char* MixAll::SPAN_NAME_END_TRANSACTION = "EndTransaction";
-const char* MixAll::SPAN_NAME_PULL_MESSAGE = "PullMessage";
// Span attributes follows to the opentelemetry specification, refers to:
// https://github.com/open-telemetry/opentelemetry-specification
diff --git a/src/main/cpp/base/include/MixAll.h b/src/main/cpp/base/include/MixAll.h
index 30ff613..5a45cd1 100644
--- a/src/main/cpp/base/include/MixAll.h
+++ b/src/main/cpp/base/include/MixAll.h
@@ -71,7 +71,6 @@ public:
static const char* TRACE_RESOURCE_ATTRIBUTE_VALUE_SERVICE_NAME;
static const char* SPAN_NAME_END_TRANSACTION;
- static const char* SPAN_NAME_PULL_MESSAGE;
// RocketMQ span attribute name list
static const char* SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION;
diff --git a/src/main/cpp/rocketmq/ProducerImpl.cpp b/src/main/cpp/rocketmq/ProducerImpl.cpp
index df3fb75..ea54b75 100644
--- a/src/main/cpp/rocketmq/ProducerImpl.cpp
+++ b/src/main/cpp/rocketmq/ProducerImpl.cpp
@@ -375,12 +375,11 @@ void ProducerImpl::send0(const MQMessage& message, SendCallback* callback, std::
sendImpl(retry_callback);
}
-bool ProducerImpl::endTransaction0(const std::string& target, const std::string& message_id,
- const std::string& transaction_id, TransactionState resolution,
- std::string trace_context) {
+bool ProducerImpl::endTransaction0(const std::string& target, const MQMessage& message,
+ const std::string& transaction_id, TransactionState resolution) {
EndTransactionRequest request;
- request.set_message_id(message_id);
+ request.set_message_id(message.getMsgId());
request.set_transaction_id(transaction_id);
request.mutable_group()->set_name(group_name_);
request.mutable_group()->set_resource_namespace(resource_namespace_);
@@ -401,7 +400,8 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
bool completed = false;
bool success = false;
// Trace transactional message
- opencensus::trace::SpanContext span_context = opencensus::trace::propagation::FromTraceParentHeader(trace_context);
+ opencensus::trace::SpanContext span_context =
+ opencensus::trace::propagation::FromTraceParentHeader(message.traceContext());
auto span = opencensus::trace::Span::BlankSpan();
if (span_context.IsValid()) {
span = opencensus::trace::Span::StartSpanWithRemoteParent(MixAll::SPAN_NAME_END_TRANSACTION, span_context,
@@ -413,7 +413,7 @@ bool ProducerImpl::endTransaction0(const std::string& target, const std::string&
credentialsProvider()->getCredentials().accessKey());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE, resourceNamespace());
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP, getGroupName());
- span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message_id);
+ span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.getMsgId());
switch (resolution) {
case TransactionState::COMMIT:
span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION, "commit");
@@ -508,19 +508,17 @@ std::unique_ptr<TransactionImpl> ProducerImpl::prepare(MQMessage& message, std::
return nullptr;
}
- return std::unique_ptr<TransactionImpl>(new TransactionImpl(
- send_result.getMsgId(), send_result.getTransactionId(), send_result.getMessageQueue().serviceAddress(),
- send_result.traceContext(), ProducerImpl::shared_from_this()));
+ return absl::make_unique<TransactionImpl>(message, send_result.getTransactionId(),
+ send_result.getMessageQueue().serviceAddress(), send_result.traceContext(),
+ ProducerImpl::shared_from_this());
}
-bool ProducerImpl::commit(const std::string& message_id, const std::string& transaction_id,
- const std::string& trace_context, const std::string& target) {
- return endTransaction0(target, message_id, transaction_id, TransactionState::COMMIT, trace_context);
+bool ProducerImpl::commit(const MQMessage& message, const std::string& transaction_id, const std::string& target) {
+ return endTransaction0(target, message, transaction_id, TransactionState::COMMIT);
}
-bool ProducerImpl::rollback(const std::string& message_id, const std::string& transaction_id,
- const std::string& trace_context, const std::string& target) {
- return endTransaction0(target, message_id, transaction_id, TransactionState::ROLLBACK, trace_context);
+bool ProducerImpl::rollback(const MQMessage& message, const std::string& transaction_id, const std::string& target) {
+ return endTransaction0(target, message, transaction_id, TransactionState::ROLLBACK);
}
void ProducerImpl::asyncPublishInfo(const std::string& topic,
@@ -624,7 +622,7 @@ void ProducerImpl::resolveOrphanedTransactionalMessage(const std::string& transa
if (transaction_state_checker_) {
TransactionState state = transaction_state_checker_->checkLocalTransactionState(message);
const std::string& target_host = MessageAccessor::targetEndpoint(message);
- endTransaction0(target_host, message.getMsgId(), transaction_id, state, message.traceContext());
+ endTransaction0(target_host, message, transaction_id, state);
} else {
SPDLOG_WARN("LocalTransactionStateChecker is unexpectedly nullptr");
}
diff --git a/src/main/cpp/rocketmq/TransactionImpl.cpp b/src/main/cpp/rocketmq/TransactionImpl.cpp
index 21610aa..11f9ea3 100644
--- a/src/main/cpp/rocketmq/TransactionImpl.cpp
+++ b/src/main/cpp/rocketmq/TransactionImpl.cpp
@@ -26,7 +26,7 @@ bool TransactionImpl::commit() {
return false;
}
- return producer->commit(message_id_, transaction_id_, trace_context_, endpoint_);
+ return producer->commit(message_, transaction_id_, endpoint_);
}
bool TransactionImpl::rollback() {
@@ -34,11 +34,11 @@ bool TransactionImpl::rollback() {
if (!producer) {
return false;
}
- return producer->rollback(message_id_, transaction_id_, trace_context_, endpoint_);
+ return producer->rollback(message_, transaction_id_, endpoint_);
}
std::string TransactionImpl::messageId() const {
- return message_id_;
+ return message_.getMsgId();
}
std::string TransactionImpl::transactionId() const {
diff --git a/src/main/cpp/rocketmq/include/ProducerImpl.h b/src/main/cpp/rocketmq/include/ProducerImpl.h
index e28f8a3..a2172ce 100644
--- a/src/main/cpp/rocketmq/include/ProducerImpl.h
+++ b/src/main/cpp/rocketmq/include/ProducerImpl.h
@@ -62,11 +62,9 @@ public:
std::unique_ptr<TransactionImpl> prepare(MQMessage& message, std::error_code& ec);
- bool commit(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context,
- const std::string& target);
+ bool commit(const MQMessage& message, const std::string& transaction_id, const std::string& target);
- bool rollback(const std::string& message_id, const std::string& transaction_id, const std::string& trace_context,
- const std::string& target);
+ bool rollback(const MQMessage& message, const std::string& transaction_id, const std::string& target);
/**
* Check if the RPC client for the target host is isolated or not
@@ -156,8 +154,8 @@ private:
void send0(const MQMessage& message, SendCallback* callback, std::vector<MQMessageQueue> list, int max_attempt_times);
- bool endTransaction0(const std::string& target, const std::string& message_id, const std::string& transaction_id,
- TransactionState resolution, std::string trace_context);
+ bool endTransaction0(const std::string& target, const MQMessage& message, const std::string& transaction_id,
+ TransactionState resolution);
void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints) LOCKS_EXCLUDED(isolated_endpoints_mtx_);
diff --git a/src/main/cpp/rocketmq/include/TransactionImpl.h b/src/main/cpp/rocketmq/include/TransactionImpl.h
index 4b721ed..844c827 100644
--- a/src/main/cpp/rocketmq/include/TransactionImpl.h
+++ b/src/main/cpp/rocketmq/include/TransactionImpl.h
@@ -19,6 +19,7 @@
#include <memory>
#include <string>
+#include "rocketmq/MQMessage.h"
#include "rocketmq/Transaction.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -27,9 +28,9 @@ class ProducerImpl;
class TransactionImpl : public Transaction {
public:
- TransactionImpl(std::string message_id, std::string transaction_id, std::string endpoint, std::string trace_context,
+ TransactionImpl(MQMessage message, std::string transaction_id, std::string endpoint, std::string trace_context,
const std::shared_ptr<ProducerImpl>& producer)
- : message_id_(std::move(message_id)), transaction_id_(std::move(transaction_id)), endpoint_(std::move(endpoint)),
+ : message_(std::move(message)), transaction_id_(std::move(transaction_id)), endpoint_(std::move(endpoint)),
trace_context_(std::move(trace_context)), producer_(producer) {
}
@@ -44,7 +45,7 @@ public:
std::string transactionId() const override;
private:
- std::string message_id_;
+ MQMessage message_;
std::string transaction_id_;
std::string endpoint_;
std::string trace_context_;