You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/12/02 03:50:54 UTC

[rocketmq-client-go] branch master updated: fix: data race when update subversion for consumer (#962)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f56a2db  fix: data race when update subversion for consumer (#962)
f56a2db is described below

commit f56a2dba2af8ab0b88869e589b0820f49b728f8c
Author: cserwen <cs...@163.com>
AuthorDate: Fri Dec 2 11:50:48 2022 +0800

    fix: data race when update subversion for consumer (#962)
    
    Co-authored-by: dengzhiwen1 <de...@xiaomi.com>
---
 consumer/consumer.go  |  2 +-
 internal/model.go     | 26 ++++++++++++++++++++++++++
 internal/utils/set.go |  6 +++++-
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 15100cd..1511f01 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -435,7 +435,7 @@ func (dc *defaultConsumer) doBalance() {
 func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData {
 	result := make([]*internal.SubscriptionData, 0)
 	dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
-		result = append(result, value.(*internal.SubscriptionData))
+		result = append(result, value.(*internal.SubscriptionData).Clone())
 		return true
 	})
 	return result
diff --git a/internal/model.go b/internal/model.go
index c248bb3..0fd7955 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -62,6 +62,32 @@ type SubscriptionData struct {
 	ExpType         string    `json:"expressionType"`
 }
 
+func (sd *SubscriptionData) Clone() *SubscriptionData {
+	cloned := &SubscriptionData{
+		ClassFilterMode: sd.ClassFilterMode,
+		Topic:           sd.Topic,
+		SubString:       sd.SubString,
+		SubVersion:      sd.SubVersion,
+		ExpType:         sd.ExpType,
+	}
+
+	if sd.Tags.Items() != nil {
+		cloned.Tags = utils.NewSet()
+		for _, value := range sd.Tags.Items() {
+			cloned.Tags.Add(value)
+		}
+	}
+
+	if sd.Codes.Items() != nil {
+		cloned.Codes = utils.NewSet()
+		for _, value := range sd.Codes.Items() {
+			cloned.Codes.Add(value)
+		}
+	}
+
+	return cloned
+}
+
 type producerData struct {
 	GroupName string `json:"groupName"`
 }
diff --git a/internal/utils/set.go b/internal/utils/set.go
index e90fb36..ed9857b 100644
--- a/internal/utils/set.go
+++ b/internal/utils/set.go
@@ -43,6 +43,10 @@ func NewSet() Set {
 	}
 }
 
+func (s *Set) Items() map[string]UniqueItem {
+	return s.items
+}
+
 func (s *Set) Add(v UniqueItem) {
 	s.items[v.UniqueID()] = v
 }
@@ -98,6 +102,6 @@ func (s *Set) MarshalJSON() ([]byte, error) {
 	return buffer.Bytes(), nil
 }
 
-func (s Set) UnmarshalJSON(data []byte) (err error) {
+func (s *Set) UnmarshalJSON(data []byte) (err error) {
 	return nil
 }