You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/03/16 12:22:33 UTC

[rocketmq-client-go] branch master updated: [ISSUE #617] retrieve transactionid from property first (#620)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 05929da  [ISSUE #617] retrieve transactionid from property first (#620)
05929da is described below

commit 05929dace17200e60a9b1d7157453308956dce94
Author: tenhan <te...@gmail.com>
AuthorDate: Tue Mar 16 20:22:28 2021 +0800

    [ISSUE #617] retrieve transactionid from property first (#620)
---
 primitive/message.go | 1 +
 producer/producer.go | 9 ++++++++-
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/primitive/message.go b/primitive/message.go
index fd7e9c6..b330dc1 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -59,6 +59,7 @@ const (
 	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
 	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
 	PropertyShardingKey                    = "SHARDING_KEY"
+	PropertyTransactionID                  = "__transactionId__"
 )
 
 type Message struct {
diff --git a/producer/producer.go b/producer/producer.go
index 65e39c2..910bb23 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -462,13 +462,20 @@ func (tp *transactionProducer) checkTransactionState() {
 			if uniqueKey == "" {
 				uniqueKey = callback.Msg.MsgId
 			}
+			transactionId := callback.Msg.GetProperty(primitive.PropertyTransactionID)
+			if transactionId == "" {
+				transactionId = callback.Header.TransactionId
+			}
+			if transactionId == "" {
+				transactionId = callback.Msg.TransactionId
+			}
 			header := &internal.EndTransactionRequestHeader{
 				CommitLogOffset:      callback.Header.CommitLogOffset,
 				ProducerGroup:        tp.producer.group,
 				TranStateTableOffset: callback.Header.TranStateTableOffset,
 				FromTransactionCheck: true,
 				MsgID:                uniqueKey,
-				TransactionId:        callback.Header.TransactionId,
+				TransactionId:        transactionId,
 				CommitOrRollback:     tp.transactionState(localTransactionState),
 			}