You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/03/16 12:22:33 UTC
[rocketmq-client-go] branch master updated: [ISSUE #617] retrieve
transactionid from property first (#620)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 05929da [ISSUE #617] retrieve transactionid from property first (#620)
05929da is described below
commit 05929dace17200e60a9b1d7157453308956dce94
Author: tenhan <te...@gmail.com>
AuthorDate: Tue Mar 16 20:22:28 2021 +0800
[ISSUE #617] retrieve transactionid from property first (#620)
---
primitive/message.go | 1 +
producer/producer.go | 9 ++++++++-
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git a/primitive/message.go b/primitive/message.go
index fd7e9c6..b330dc1 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -59,6 +59,7 @@ const (
PropertyTranscationCheckTimes = "TRANSACTION_CHECK_TIMES"
PropertyCheckImmunityTimeInSeconds = "CHECK_IMMUNITY_TIME_IN_SECONDS"
PropertyShardingKey = "SHARDING_KEY"
+ PropertyTransactionID = "__transactionId__"
)
type Message struct {
diff --git a/producer/producer.go b/producer/producer.go
index 65e39c2..910bb23 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -462,13 +462,20 @@ func (tp *transactionProducer) checkTransactionState() {
if uniqueKey == "" {
uniqueKey = callback.Msg.MsgId
}
+ transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID)
+ if transactionId == "" {
+ transactionId = callback.Header.TransactionId
+ }
+ if transactionId == "" {
+ transactionId = callback.Msg.TransactionId
+ }
header := &internal.EndTransactionRequestHeader{
CommitLogOffset: callback.Header.CommitLogOffset,
ProducerGroup: tp.producer.group,
TranStateTableOffset: callback.Header.TranStateTableOffset,
FromTransactionCheck: true,
MsgID: uniqueKey,
- TransactionId: callback.Header.TransactionId,
+ TransactionId: transactionId,
CommitOrRollback: tp.transactionState(localTransactionState),
}