You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/13 03:10:45 UTC

[rocketmq-client-go] branch native updated: [ISSUE #139]add locker for updating data (#146)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 08cd71e  [ISSUE #139]add locker for updating data (#146)
08cd71e is described below

commit 08cd71ef98c146e87ce44c9c7af0bee136d18047
Author: wolftankk <wo...@gmail.com>
AuthorDate: Tue Aug 13 11:10:40 2019 +0800

    [ISSUE #139]add locker for updating data (#146)
---
 consumer/offset_store.go | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 8bc5b5a..1eb246c 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -122,6 +122,8 @@ func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType)
 }
 
 func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
+	local.mutex.Lock()
+	defer local.mutex.Unlock()
 	rlog.Debugf("update offset: %s to %d", mq, offset)
 	localOffset, exist := local.OffsetTable[mq.Topic]
 	if !exist {
@@ -149,6 +151,8 @@ func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
 	if len(mqs) == 0 {
 		return
 	}
+	local.mutex.Lock()
+	defer local.mutex.Unlock()
 	table := make(map[string]map[int]*queueOffset)
 	for idx := range mqs {
 		mq := mqs[idx]