You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/14 09:20:00 UTC
[rocketmq-client-go] branch master updated: [ISSUE #832] Client may submit wrong offset when network instability
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 16443e2 [ISSUE #832] Client may submit wrong offset when network instability
16443e2 is described below
commit 16443e264556653ee821ce9a5391c7a13275fa0e
Author: AaronWang <ts...@126.com>
AuthorDate: Tue Jun 14 17:19:54 2022 +0800
[ISSUE #832] Client may submit wrong offset when network instability
Co-authored-by: chenhui <wa...@xiaohongshu.com>
---
consumer/consumer.go | 28 ++++++++++++++-----
consumer/consumer_test.go | 24 +++++++++++------
consumer/mock_offset_store.go | 63 +++++++++++++++++++++++++++++--------------
consumer/offset_store.go | 29 ++++++++++++++------
consumer/offset_store_test.go | 24 ++++++++---------
consumer/pull_consumer.go | 6 +++--
consumer/push_consumer.go | 17 +++++++++---
7 files changed, 130 insertions(+), 61 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index ed843cf..6e53eaf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -28,6 +28,7 @@ import (
"time"
"github.com/apache/rocketmq-client-go/v2/errors"
+
jsoniter "github.com/json-iterator/go"
"github.com/tidwall/gjson"
@@ -699,8 +700,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
continue
}
dc.storage.remove(&mq)
- nextOffset := dc.computePullFromWhere(&mq)
- if nextOffset >= 0 {
+ nextOffset, err := dc.computePullFromWhereWithException(&mq)
+
+ if nextOffset >= 0 && err == nil {
_, exist := dc.processQueueTable.Load(mq)
if exist {
rlog.Debug("do defaultConsumer, mq already exist", map[string]interface{}{
@@ -741,12 +743,23 @@ func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQu
return true
}
+// Deprecated: Use computePullFromWhereWithException instead.
func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int64 {
+ result, _ := dc.computePullFromWhereWithException(mq)
+ return result
+}
+
+func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) {
if dc.cType == _PullConsume {
- return 0
+ return 0, nil
}
- var result = int64(-1)
- lastOffset := dc.storage.read(mq, _ReadFromStore)
+ result := int64(-1)
+ lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
+ if err != nil {
+ // 这里 lastOffset = -1
+ return lastOffset, err
+ }
+
if lastOffset >= 0 {
result = lastOffset
} else {
@@ -803,7 +816,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int6
default:
}
}
- return result
+ return result, nil
}
func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData,
@@ -950,7 +963,8 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
}
func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
- return dc.storage.read(mq, _ReadMemoryThenStore)
+ result, _ := dc.storage.readWithException(mq, _ReadMemoryThenStore)
+ return result
}
// SearchOffsetByTimestamp with specific queueId and topic
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 12ccd18..e441290 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -26,6 +26,7 @@ import (
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
+ "github.com/apache/rocketmq-client-go/v2/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -116,13 +117,20 @@ func TestComputePullFromWhere(t *testing.T) {
rmqCli.SetNameSrv(namesrvCli)
Convey("get effective offset", func() {
- offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
- res := dc.computePullFromWhere(mq)
+ offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(10), nil)
+ res, _ := dc.computePullFromWhereWithException(mq)
assert.Equal(t, int64(10), res)
})
+ Convey("get offset error", func() {
+ offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), errors.ErrRequestTimeout)
+
+ _, err := dc.computePullFromWhereWithException(mq)
+ assert.Equal(t, err, errors.ErrRequestTimeout)
+ })
+
Convey("ConsumeFromLastOffset for normal topic", func() {
- offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+ offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil)
dc.option.FromWhere = ConsumeFromLastOffset
broker := "a"
@@ -135,20 +143,20 @@ func TestComputePullFromWhere(t *testing.T) {
},
}, nil)
- res := dc.computePullFromWhere(mq)
+ res, _ := dc.computePullFromWhereWithException(mq)
assert.Equal(t, int64(20), res)
})
Convey("ConsumeFromFirstOffset for normal topic", func() {
- offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+ offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil)
dc.option.FromWhere = ConsumeFromFirstOffset
- res := dc.computePullFromWhere(mq)
+ res, _ := dc.computePullFromWhereWithException(mq)
assert.Equal(t, int64(0), res)
})
Convey("ConsumeFromTimestamp for normal topic", func() {
- offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+ offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil)
dc.option.FromWhere = ConsumeFromTimestamp
dc.option.ConsumeTimestamp = "20060102150405"
@@ -163,7 +171,7 @@ func TestComputePullFromWhere(t *testing.T) {
},
}, nil)
- res := dc.computePullFromWhere(mq)
+ res, _ := dc.computePullFromWhereWithException(mq)
assert.Equal(t, int64(30), res)
})
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index bac1cdb..145ec3d 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -28,67 +28,90 @@ import (
gomock "github.com/golang/mock/gomock"
)
-// MockOffsetStore is a mock of OffsetStore interface
+// MockOffsetStore is a mock of OffsetStore interface.
type MockOffsetStore struct {
ctrl *gomock.Controller
recorder *MockOffsetStoreMockRecorder
}
-// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
+// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore.
type MockOffsetStoreMockRecorder struct {
mock *MockOffsetStore
}
-// NewMockOffsetStore creates a new mock instance
+// NewMockOffsetStore creates a new mock instance.
func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore {
mock := &MockOffsetStore{ctrl: ctrl}
mock.recorder = &MockOffsetStoreMockRecorder{mock}
return mock
}
-// EXPECT returns an object that allows the caller to indicate expected use
+// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder {
return m.recorder
}
-// persist mocks base method
+// persist mocks base method.
func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "persist", mqs)
}
-// persist indicates an expected call of persist
+// persist indicates an expected call of persist.
func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs)
}
-// remove mocks base method
-func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
- m.ctrl.Call(m, "remove", mq)
-}
-
-// remove indicates an expected call of remove
-func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
-}
-
-// read mocks base method
+// read mocks base method.
func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+ m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "read", mq, t)
ret0, _ := ret[0].(int64)
return ret0
}
-// read indicates an expected call of read
+// read indicates an expected call of read.
func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t)
}
-// update mocks base method
+// readWithException mocks base method.
+func (m *MockOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "readWithException", mq, t)
+ ret0, _ := ret[0].(int64)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// readWithException indicates an expected call of readWithException.
+func (mr *MockOffsetStoreMockRecorder) readWithException(mq, t interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "readWithException", reflect.TypeOf((*MockOffsetStore)(nil).readWithException), mq, t)
+}
+
+// remove mocks base method.
+func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "remove", mq)
+}
+
+// remove indicates an expected call of remove.
+func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
+}
+
+// update mocks base method.
func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
+ m.ctrl.T.Helper()
m.ctrl.Call(m, "update", mq, offset, increaseOnly)
}
-// update indicates an expected call of update
+// update indicates an expected call of update.
func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 5ecfd14..86ecd18 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -57,6 +57,7 @@ type OffsetStore interface {
persist(mqs []*primitive.MessageQueue)
remove(mq *primitive.MessageQueue)
read(mq *primitive.MessageQueue, t readType) int64
+ readWithException(mq *primitive.MessageQueue, t readType) (int64, error)
update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
}
@@ -156,21 +157,27 @@ func (local *localFileOffsetStore) load() {
}
}
+// Deprecated: Use readWithException instead.
func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+ result, _ := local.readWithException(mq, t)
+ return result
+}
+
+func (local *localFileOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) {
switch t {
case _ReadFromMemory, _ReadMemoryThenStore:
off := readFromMemory(local.OffsetTable, mq)
if off >= 0 || (off == -1 && t == _ReadFromMemory) {
- return off
+ return off, nil
}
fallthrough
case _ReadFromStore:
local.load()
- return readFromMemory(local.OffsetTable, mq)
+ return readFromMemory(local.OffsetTable, mq), nil
default:
}
- return -1
+ return -1, nil
}
func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
@@ -284,18 +291,24 @@ func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
})
}
+// Deprecated: Use readWithException instead.
func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+ result, _ := r.readWithException(mq, t)
+ return result
+}
+
+func (r *remoteBrokerOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) {
r.mutex.RLock()
switch t {
case _ReadFromMemory, _ReadMemoryThenStore:
off, exist := r.OffsetTable[*mq]
if exist {
r.mutex.RUnlock()
- return off
+ return off, nil
}
if t == _ReadFromMemory {
r.mutex.RUnlock()
- return -1
+ return -1, nil
}
fallthrough
case _ReadFromStore:
@@ -307,7 +320,7 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i
rlog.LogKeyUnderlayError: err,
})
r.mutex.RUnlock()
- return -1
+ return -1, err
}
rlog.Warning("fetch offset of mq from broker success", map[string]interface{}{
rlog.LogKeyConsumerGroup: r.group,
@@ -316,11 +329,11 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i
})
r.mutex.RUnlock()
r.update(mq, off, true)
- return off
+ return off, nil
default:
}
- return -1
+ return -1, nil
}
func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index cfa0eaa..d833f36 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -98,7 +98,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
}
for _, value := range cases {
localStore.update(value.queue, value.setOffset, false)
- offset := localStore.read(value.queue, _ReadFromMemory)
+ offset, _ := localStore.readWithException(value.queue, _ReadFromMemory)
So(offset, ShouldEqual, value.expectedOffset)
}
})
@@ -119,7 +119,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
}
for _, value := range cases {
localStore.update(value.queue, value.setOffset, true)
- offset := localStore.read(value.queue, _ReadFromMemory)
+ offset, _ := localStore.readWithException(value.queue, _ReadFromMemory)
So(offset, ShouldEqual, value.expectedOffset)
}
})
@@ -127,16 +127,16 @@ func TestLocalFileOffsetStore(t *testing.T) {
Convey("test persist", func() {
localStore.update(mq, 1, false)
- offset := localStore.read(mq, _ReadFromMemory)
+ offset, _ := localStore.readWithException(mq, _ReadFromMemory)
So(offset, ShouldEqual, 1)
queues := []*primitive.MessageQueue{mq}
localStore.persist(queues)
- offset = localStore.read(mq, _ReadFromStore)
+ offset, _ = localStore.readWithException(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)
localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq))
- offset = localStore.read(mq, _ReadMemoryThenStore)
+ offset, _ = localStore.readWithException(mq, _ReadMemoryThenStore)
So(offset, ShouldEqual, 1)
})
})
@@ -178,7 +178,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
}
for _, value := range cases {
remoteStore.update(value.queue, value.setOffset, false)
- offset := remoteStore.read(value.queue, _ReadFromMemory)
+ offset, _ := remoteStore.readWithException(value.queue, _ReadFromMemory)
So(offset, ShouldEqual, value.expectedOffset)
}
})
@@ -199,7 +199,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
}
for _, value := range cases {
remoteStore.update(value.queue, value.setOffset, true)
- offset := remoteStore.read(value.queue, _ReadFromMemory)
+ offset, _ := remoteStore.readWithException(value.queue, _ReadFromMemory)
So(offset, ShouldEqual, value.expectedOffset)
}
})
@@ -219,24 +219,24 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2)
remoteStore.persist(queues)
- offset := remoteStore.read(mq, _ReadFromStore)
+ offset, _ := remoteStore.readWithException(mq, _ReadFromStore)
So(offset, ShouldEqual, 1)
remoteStore.remove(mq)
- offset = remoteStore.read(mq, _ReadFromMemory)
+ offset, _ = remoteStore.readWithException(mq, _ReadFromMemory)
So(offset, ShouldEqual, -1)
- offset = remoteStore.read(mq, _ReadMemoryThenStore)
+ offset, _ = remoteStore.readWithException(mq, _ReadMemoryThenStore)
So(offset, ShouldEqual, 1)
})
Convey("test remove", func() {
remoteStore.update(mq, 1, false)
- offset := remoteStore.read(mq, _ReadFromMemory)
+ offset, _ := remoteStore.readWithException(mq, _ReadFromMemory)
So(offset, ShouldEqual, 1)
remoteStore.remove(mq)
- offset = remoteStore.read(mq, _ReadFromMemory)
+ offset, _ = remoteStore.readWithException(mq, _ReadFromMemory)
So(offset, ShouldEqual, -1)
})
})
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index ff6f7d7..4699601 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -20,10 +20,11 @@ package consumer
import (
"context"
"fmt"
- errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"sync"
"sync/atomic"
+ errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+
"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
@@ -219,7 +220,8 @@ func (c *defaultPullConsumer) makeSureStateOK() error {
}
func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 {
- return c.computePullFromWhere(queue)
+ result, _ := c.computePullFromWhereWithException(queue)
+ return result
}
// PullFrom pull messages of queue from the offset to offset + numbers
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 801f412..4ad5ee3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,7 +20,6 @@ package consumer
import (
"context"
"fmt"
- errors2 "github.com/apache/rocketmq-client-go/v2/errors"
"math"
"runtime/pprof"
"strconv"
@@ -29,6 +28,8 @@ import (
"sync/atomic"
"time"
+ errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+
"github.com/pkg/errors"
"github.com/apache/rocketmq-client-go/v2/internal"
@@ -375,7 +376,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRun
mq := key.(primitive.MessageQueue)
pq := value.(*processQueue)
pInfo := pq.currentInfo()
- pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
+ pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore)
info.MQTable[mq] = pInfo
return true
})
@@ -644,7 +645,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
} else {
if pq.IsLock() {
if !request.lockedFirst {
- offset := pc.computePullFromWhere(request.mq)
+ offset, err := pc.computePullFromWhereWithException(request.mq)
+ if err != nil {
+ rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err.Error(),
+ })
+ sleepTime = _PullDelayTimeWhenError
+ goto NEXT
+ }
+
brokerBusy := offset < request.nextOffset
rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{
rlog.LogKeyPullRequest: request.String(),
@@ -684,7 +693,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
)
if pc.model == Clustering {
- commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
+ commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory)
if commitOffsetValue > 0 {
commitOffsetEnable = true
}