You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/01/15 09:53:13 UTC

[rocketmq-client-go] branch master updated: [ISSUE #544] Fix bug, concurrent map read and map write local offest file (#550)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f1e366e  [ISSUE #544] Fix bug, concurrent map read and map write local offest file (#550)
f1e366e is described below

commit f1e366ec5f6493c5409266fb9ca9f290329de402
Author: feiquan <22...@qq.com>
AuthorDate: Fri Jan 15 17:53:08 2021 +0800

    [ISSUE #544] Fix bug, concurrent map read and map write local offest file (#550)
    
    * FIX:fatal error: concurrent map read and map write(apache#544)
---
 .gitignore                    |  3 ++-
 README.md                     |  2 +-
 consumer/offset_store.go      | 38 +++++++++++++++++++++++---------------
 consumer/offset_store_test.go |  7 +++----
 4 files changed, 29 insertions(+), 21 deletions(-)

diff --git a/.gitignore b/.gitignore
index a0c292d..99fd04d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,4 +3,5 @@ go.mod
 go.sum
 vendor/
 coverage.txt
-examples/test
\ No newline at end of file
+examples/test
+/.vscode
\ No newline at end of file
diff --git a/README.md b/README.md
index 0f30d5c..08494a6 100644
--- a/README.md
+++ b/README.md
@@ -52,4 +52,4 @@ For 2.0.0 version, it supports:
    
 ----------
 ## License
-  [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
+  [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) Copyright (C) Apache Software Foundation
\ No newline at end of file
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 17f5d76..4d0b271 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -26,13 +26,12 @@ import (
 	"sync"
 	"time"
 
-	jsoniter "github.com/json-iterator/go"
-
 	"github.com/apache/rocketmq-client-go/v2/internal"
 	"github.com/apache/rocketmq-client-go/v2/internal/remote"
 	"github.com/apache/rocketmq-client-go/v2/internal/utils"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
 	"github.com/apache/rocketmq-client-go/v2/rlog"
+	jsoniter "github.com/json-iterator/go"
 )
 
 type readType int
@@ -101,7 +100,7 @@ func (mq *MessageQueueKey) UnmarshalText(text []byte) error {
 type localFileOffsetStore struct {
 	group       string
 	path        string
-	OffsetTable map[MessageQueueKey]int64
+	OffsetTable *sync.Map // concurrent safe , map[MessageQueueKey]int64
 	// mutex for offset file
 	mutex sync.Mutex
 }
@@ -110,7 +109,7 @@ func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
 	store := &localFileOffsetStore{
 		group:       group,
 		path:        filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),
-		OffsetTable: make(map[MessageQueueKey]int64),
+		OffsetTable: new(sync.Map),
 	}
 	store.load()
 	return store
@@ -151,7 +150,9 @@ func (local *localFileOffsetStore) load() {
 	}
 
 	if datas != nil {
-		local.OffsetTable = datas
+		for k, v := range datas {
+			local.OffsetTable.Store(k, v)
+		}
 	}
 }
 
@@ -180,17 +181,17 @@ func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int
 		"new_offset":            offset,
 	})
 	key := MessageQueueKey(*mq)
-	localOffset, exist := local.OffsetTable[key]
+	localOffset, exist := local.OffsetTable.Load(key)
 	if !exist {
-		local.OffsetTable[key] = offset
+		local.OffsetTable.Store(key, offset)
 		return
 	}
 	if increaseOnly {
-		if localOffset < offset {
-			local.OffsetTable[key] = offset
+		if localOffset.(int64) < offset {
+			local.OffsetTable.Store(key, offset)
 		}
 	} else {
-		local.OffsetTable[key] = offset
+		local.OffsetTable.Store(key, offset)
 	}
 }
 
@@ -201,10 +202,17 @@ func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
 	local.mutex.Lock()
 	defer local.mutex.Unlock()
 
+	datas := make(map[MessageQueueKey]int64)
+	local.OffsetTable.Range(func(key, value interface{}) bool {
+		k := key.(MessageQueueKey)
+		v := value.(int64)
+		datas[k] = v
+		return true
+	})
+
 	wrapper := OffsetSerializeWrapper{
-		OffsetTable: local.OffsetTable,
+		OffsetTable: datas,
 	}
-
 	data, _ := jsoniter.Marshal(wrapper)
 	utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))
 }
@@ -384,11 +392,11 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group string, mq p
 	return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
 }
 
-func readFromMemory(table map[MessageQueueKey]int64, mq *primitive.MessageQueue) int64 {
-	localOffset, exist := table[MessageQueueKey(*mq)]
+func readFromMemory(table *sync.Map, mq *primitive.MessageQueue) int64 {
+	localOffset, exist := table.Load(MessageQueueKey(*mq))
 	if !exist {
 		return -1
 	}
 
-	return localOffset
+	return localOffset.(int64)
 }
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index 27b98d9..cfa0eaa 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -21,12 +21,11 @@ import (
 	"path/filepath"
 	"testing"
 
-	"github.com/golang/mock/gomock"
-	. "github.com/smartystreets/goconvey/convey"
-
 	"github.com/apache/rocketmq-client-go/v2/internal"
 	"github.com/apache/rocketmq-client-go/v2/internal/remote"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/golang/mock/gomock"
+	. "github.com/smartystreets/goconvey/convey"
 )
 
 func TestNewLocalFileOffsetStore(t *testing.T) {
@@ -136,7 +135,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
 			offset = localStore.read(mq, _ReadFromStore)
 			So(offset, ShouldEqual, 1)
 
-			delete(localStore.(*localFileOffsetStore).OffsetTable, MessageQueueKey(*mq))
+			localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq))
 			offset = localStore.read(mq, _ReadMemoryThenStore)
 			So(offset, ShouldEqual, 1)
 		})