You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/01/15 09:53:13 UTC
[rocketmq-client-go] branch master updated: [ISSUE #544] Fix bug,
concurrent map read and map write local offest file (#550)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f1e366e [ISSUE #544] Fix bug, concurrent map read and map write local offest file (#550)
f1e366e is described below
commit f1e366ec5f6493c5409266fb9ca9f290329de402
Author: feiquan <22...@qq.com>
AuthorDate: Fri Jan 15 17:53:08 2021 +0800
[ISSUE #544] Fix bug, concurrent map read and map write local offest file (#550)
* FIX:fatal error: concurrent map read and map write(apache#544)
---
.gitignore | 3 ++-
README.md | 2 +-
consumer/offset_store.go | 38 +++++++++++++++++++++++---------------
consumer/offset_store_test.go | 7 +++----
4 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/.gitignore b/.gitignore
index a0c292d..99fd04d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,5 @@ go.mod
go.sum
vendor/
coverage.txt
-examples/test
\ No newline at end of file
+examples/test
+/.vscode
\ No newline at end of file
diff --git a/README.md b/README.md
index 0f30d5c..08494a6 100644
--- a/README.md
+++ b/README.md
@@ -52,4 +52,4 @@ For 2.0.0 version, it supports:
----------
## License
- [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
+ [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
\ No newline at end of file
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 17f5d76..4d0b271 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -26,13 +26,12 @@ import (
"sync"
"time"
- jsoniter "github.com/json-iterator/go"
-
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/internal/utils"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
+ jsoniter "github.com/json-iterator/go"
)
type readType int
@@ -101,7 +100,7 @@ func (mq *MessageQueueKey) UnmarshalText(text []byte) error {
type localFileOffsetStore struct {
group string
path string
- OffsetTable map[MessageQueueKey]int64
+ OffsetTable *sync.Map // concurrent safe , map[MessageQueueKey]int64
// mutex for offset file
mutex sync.Mutex
}
@@ -110,7 +109,7 @@ func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
store := &localFileOffsetStore{
group: group,
path: filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),
- OffsetTable: make(map[MessageQueueKey]int64),
+ OffsetTable: new(sync.Map),
}
store.load()
return store
@@ -151,7 +150,9 @@ func (local *localFileOffsetStore) load() {
}
if datas != nil {
- local.OffsetTable = datas
+ for k, v := range datas {
+ local.OffsetTable.Store(k, v)
+ }
}
}
@@ -180,17 +181,17 @@ func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int
"new_offset": offset,
})
key := MessageQueueKey(*mq)
- localOffset, exist := local.OffsetTable[key]
+ localOffset, exist := local.OffsetTable.Load(key)
if !exist {
- local.OffsetTable[key] = offset
+ local.OffsetTable.Store(key, offset)
return
}
if increaseOnly {
- if localOffset < offset {
- local.OffsetTable[key] = offset
+ if localOffset.(int64) < offset {
+ local.OffsetTable.Store(key, offset)
}
} else {
- local.OffsetTable[key] = offset
+ local.OffsetTable.Store(key, offset)
}
}
@@ -201,10 +202,17 @@ func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
local.mutex.Lock()
defer local.mutex.Unlock()
+ datas := make(map[MessageQueueKey]int64)
+ local.OffsetTable.Range(func(key, value interface{}) bool {
+ k := key.(MessageQueueKey)
+ v := value.(int64)
+ datas[k] = v
+ return true
+ })
+
wrapper := OffsetSerializeWrapper{
- OffsetTable: local.OffsetTable,
+ OffsetTable: datas,
}
-
data, _ := jsoniter.Marshal(wrapper)
utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))
}
@@ -384,11 +392,11 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p
return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
}
-func readFromMemory(table map[MessageQueueKey]int64, mq *primitive.MessageQueue) int64 {
- localOffset, exist := table[MessageQueueKey(*mq)]
+func readFromMemory(table *sync.Map, mq *primitive.MessageQueue) int64 {
+ localOffset, exist := table.Load(MessageQueueKey(*mq))
if !exist {
return -1
}
- return localOffset
+ return localOffset.(int64)
}
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index 27b98d9..cfa0eaa 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -21,12 +21,11 @@ import (
"path/filepath"
"testing"
- "github.com/golang/mock/gomock"
- . "github.com/smartystreets/goconvey/convey"
-
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/golang/mock/gomock"
+ . "github.com/smartystreets/goconvey/convey"
)
func TestNewLocalFileOffsetStore(t *testing.T) {
@@ -136,7 +135,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
offset = localStore.read(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)
- delete(localStore.(*localFileOffsetStore).OffsetTable, MessageQueueKey(*mq))
+ localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq))
offset = localStore.read(mq, _ReadMemoryThenStore)
So(offset, ShouldEqual, 1)
})