You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/12/02 03:50:54 UTC
[rocketmq-client-go] branch master updated: fix: data race when update subversion for consumer (#962)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f56a2db fix: data race when update subversion for consumer (#962)
f56a2db is described below
commit f56a2dba2af8ab0b88869e589b0820f49b728f8c
Author: cserwen <cs...@163.com>
AuthorDate: Fri Dec 2 11:50:48 2022 +0800
fix: data race when update subversion for consumer (#962)
Co-authored-by: dengzhiwen1 <de...@xiaomi.com>
---
consumer/consumer.go | 2 +-
internal/model.go | 26 ++++++++++++++++++++++++++
internal/utils/set.go | 6 +++++-
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 15100cd..1511f01 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -435,7 +435,7 @@ func (dc *defaultConsumer) doBalance() {
func (dc *defaultConsumer) SubscriptionDataList() []*internal.SubscriptionData {
result := make([]*internal.SubscriptionData, 0)
dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
- result = append(result, value.(*internal.SubscriptionData))
+ result = append(result, value.(*internal.SubscriptionData).Clone())
return true
})
return result
diff --git a/internal/model.go b/internal/model.go
index c248bb3..0fd7955 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -62,6 +62,32 @@ type SubscriptionData struct {
ExpType string `json:"expressionType"`
}
+func (sd *SubscriptionData) Clone() *SubscriptionData {
+ cloned := &SubscriptionData{
+ ClassFilterMode: sd.ClassFilterMode,
+ Topic: sd.Topic,
+ SubString: sd.SubString,
+ SubVersion: sd.SubVersion,
+ ExpType: sd.ExpType,
+ }
+
+ if sd.Tags.Items() != nil {
+ cloned.Tags = utils.NewSet()
+ for _, value := range sd.Tags.Items() {
+ cloned.Tags.Add(value)
+ }
+ }
+
+ if sd.Codes.Items() != nil {
+ cloned.Codes = utils.NewSet()
+ for _, value := range sd.Codes.Items() {
+ cloned.Codes.Add(value)
+ }
+ }
+
+ return cloned
+}
+
type producerData struct {
GroupName string `json:"groupName"`
}
diff --git a/internal/utils/set.go b/internal/utils/set.go
index e90fb36..ed9857b 100644
--- a/internal/utils/set.go
+++ b/internal/utils/set.go
@@ -43,6 +43,10 @@ func NewSet() Set {
}
}
+func (s *Set) Items() map[string]UniqueItem {
+ return s.items
+}
+
func (s *Set) Add(v UniqueItem) {
s.items[v.UniqueID()] = v
}
@@ -98,6 +102,6 @@ func (s *Set) MarshalJSON() ([]byte, error) {
return buffer.Bytes(), nil
}
-func (s Set) UnmarshalJSON(data []byte) (err error) {
+func (s *Set) UnmarshalJSON(data []byte) (err error) {
return nil
}