You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/06/14 09:20:00 UTC

[rocketmq-client-go] branch master updated: [ISSUE #832] Client may submit wrong offset when network instability

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 16443e2  [ISSUE #832] Client may submit wrong offset when network instability
16443e2 is described below

commit 16443e264556653ee821ce9a5391c7a13275fa0e
Author: AaronWang <ts...@126.com>
AuthorDate: Tue Jun 14 17:19:54 2022 +0800

    [ISSUE #832] Client may submit wrong offset when network instability
    
    Co-authored-by: chenhui <wa...@xiaohongshu.com>
---
 consumer/consumer.go          | 28 ++++++++++++++-----
 consumer/consumer_test.go     | 24 +++++++++++------
 consumer/mock_offset_store.go | 63 +++++++++++++++++++++++++++++--------------
 consumer/offset_store.go      | 29 ++++++++++++++------
 consumer/offset_store_test.go | 24 ++++++++---------
 consumer/pull_consumer.go     |  6 +++--
 consumer/push_consumer.go     | 17 +++++++++---
 7 files changed, 130 insertions(+), 61 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index ed843cf..6e53eaf 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -28,6 +28,7 @@ import (
 	"time"
 
 	"github.com/apache/rocketmq-client-go/v2/errors"
+
 	jsoniter "github.com/json-iterator/go"
 	"github.com/tidwall/gjson"
 
@@ -699,8 +700,9 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitiv
 				continue
 			}
 			dc.storage.remove(&mq)
-			nextOffset := dc.computePullFromWhere(&mq)
-			if nextOffset >= 0 {
+			nextOffset, err := dc.computePullFromWhereWithException(&mq)
+
+			if nextOffset >= 0 && err == nil {
 				_, exist := dc.processQueueTable.Load(mq)
 				if exist {
 					rlog.Debug("do defaultConsumer, mq already exist", map[string]interface{}{
@@ -741,12 +743,23 @@ func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQu
 	return true
 }
 
+// Deprecated: Use computePullFromWhereWithException instead.
 func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int64 {
+	result, _ := dc.computePullFromWhereWithException(mq)
+	return result
+}
+
+func (dc *defaultConsumer) computePullFromWhereWithException(mq *primitive.MessageQueue) (int64, error) {
 	if dc.cType == _PullConsume {
-		return 0
+		return 0, nil
 	}
-	var result = int64(-1)
-	lastOffset := dc.storage.read(mq, _ReadFromStore)
+	result := int64(-1)
+	lastOffset, err := dc.storage.readWithException(mq, _ReadFromStore)
+	if err != nil {
+		// 这里 lastOffset = -1
+		return lastOffset, err
+	}
+
 	if lastOffset >= 0 {
 		result = lastOffset
 	} else {
@@ -803,7 +816,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int6
 		default:
 		}
 	}
-	return result
+	return result, nil
 }
 
 func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData,
@@ -950,7 +963,8 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
 }
 
 func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) int64 {
-	return dc.storage.read(mq, _ReadMemoryThenStore)
+	result, _ := dc.storage.readWithException(mq, _ReadMemoryThenStore)
+	return result
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index 12ccd18..e441290 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -26,6 +26,7 @@ import (
 	. "github.com/smartystreets/goconvey/convey"
 	"github.com/stretchr/testify/assert"
 
+	"github.com/apache/rocketmq-client-go/v2/errors"
 	"github.com/apache/rocketmq-client-go/v2/internal"
 	"github.com/apache/rocketmq-client-go/v2/internal/remote"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -116,13 +117,20 @@ func TestComputePullFromWhere(t *testing.T) {
 		rmqCli.SetNameSrv(namesrvCli)
 
 		Convey("get effective offset", func() {
-			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
-			res := dc.computePullFromWhere(mq)
+			offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(10), nil)
+			res, _ := dc.computePullFromWhereWithException(mq)
 			assert.Equal(t, int64(10), res)
 		})
 
+		Convey("get offset error", func() {
+			offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), errors.ErrRequestTimeout)
+
+			_, err := dc.computePullFromWhereWithException(mq)
+			assert.Equal(t, err, errors.ErrRequestTimeout)
+		})
+
 		Convey("ConsumeFromLastOffset for normal topic", func() {
-			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+			offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil)
 			dc.option.FromWhere = ConsumeFromLastOffset
 
 			broker := "a"
@@ -135,20 +143,20 @@ func TestComputePullFromWhere(t *testing.T) {
 					},
 				}, nil)
 
-			res := dc.computePullFromWhere(mq)
+			res, _ := dc.computePullFromWhereWithException(mq)
 			assert.Equal(t, int64(20), res)
 		})
 
 		Convey("ConsumeFromFirstOffset for normal topic", func() {
-			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+			offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil)
 			dc.option.FromWhere = ConsumeFromFirstOffset
 
-			res := dc.computePullFromWhere(mq)
+			res, _ := dc.computePullFromWhereWithException(mq)
 			assert.Equal(t, int64(0), res)
 		})
 
 		Convey("ConsumeFromTimestamp for normal topic", func() {
-			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+			offsetStore.EXPECT().readWithException(gomock.Any(), gomock.Any()).Return(int64(-1), nil)
 			dc.option.FromWhere = ConsumeFromTimestamp
 
 			dc.option.ConsumeTimestamp = "20060102150405"
@@ -163,7 +171,7 @@ func TestComputePullFromWhere(t *testing.T) {
 					},
 				}, nil)
 
-			res := dc.computePullFromWhere(mq)
+			res, _ := dc.computePullFromWhereWithException(mq)
 			assert.Equal(t, int64(30), res)
 		})
 
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
index bac1cdb..145ec3d 100644
--- a/consumer/mock_offset_store.go
+++ b/consumer/mock_offset_store.go
@@ -28,67 +28,90 @@ import (
 	gomock "github.com/golang/mock/gomock"
 )
 
-// MockOffsetStore is a mock of OffsetStore interface
+// MockOffsetStore is a mock of OffsetStore interface.
 type MockOffsetStore struct {
 	ctrl     *gomock.Controller
 	recorder *MockOffsetStoreMockRecorder
 }
 
-// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
+// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore.
 type MockOffsetStoreMockRecorder struct {
 	mock *MockOffsetStore
 }
 
-// NewMockOffsetStore creates a new mock instance
+// NewMockOffsetStore creates a new mock instance.
 func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore {
 	mock := &MockOffsetStore{ctrl: ctrl}
 	mock.recorder = &MockOffsetStoreMockRecorder{mock}
 	return mock
 }
 
-// EXPECT returns an object that allows the caller to indicate expected use
+// EXPECT returns an object that allows the caller to indicate expected use.
 func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder {
 	return m.recorder
 }
 
-// persist mocks base method
+// persist mocks base method.
 func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "persist", mqs)
 }
 
-// persist indicates an expected call of persist
+// persist indicates an expected call of persist.
 func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs)
 }
 
-// remove mocks base method
-func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
-	m.ctrl.Call(m, "remove", mq)
-}
-
-// remove indicates an expected call of remove
-func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
-}
-
-// read mocks base method
+// read mocks base method.
 func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "read", mq, t)
 	ret0, _ := ret[0].(int64)
 	return ret0
 }
 
-// read indicates an expected call of read
+// read indicates an expected call of read.
 func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t)
 }
 
-// update mocks base method
+// readWithException mocks base method.
+func (m *MockOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "readWithException", mq, t)
+	ret0, _ := ret[0].(int64)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// readWithException indicates an expected call of readWithException.
+func (mr *MockOffsetStoreMockRecorder) readWithException(mq, t interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "readWithException", reflect.TypeOf((*MockOffsetStore)(nil).readWithException), mq, t)
+}
+
+// remove mocks base method.
+func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "remove", mq)
+}
+
+// remove indicates an expected call of remove.
+func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
+}
+
+// update mocks base method.
 func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "update", mq, offset, increaseOnly)
 }
 
-// update indicates an expected call of update
+// update indicates an expected call of update.
 func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
 }
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 5ecfd14..86ecd18 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -57,6 +57,7 @@ type OffsetStore interface {
 	persist(mqs []*primitive.MessageQueue)
 	remove(mq *primitive.MessageQueue)
 	read(mq *primitive.MessageQueue, t readType) int64
+	readWithException(mq *primitive.MessageQueue, t readType) (int64, error)
 	update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
 }
 
@@ -156,21 +157,27 @@ func (local *localFileOffsetStore) load() {
 	}
 }
 
+// Deprecated: Use readWithException instead.
 func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+	result, _ := local.readWithException(mq, t)
+	return result
+}
+
+func (local *localFileOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) {
 	switch t {
 	case _ReadFromMemory, _ReadMemoryThenStore:
 		off := readFromMemory(local.OffsetTable, mq)
 		if off >= 0 || (off == -1 && t == _ReadFromMemory) {
-			return off
+			return off, nil
 		}
 		fallthrough
 	case _ReadFromStore:
 		local.load()
-		return readFromMemory(local.OffsetTable, mq)
+		return readFromMemory(local.OffsetTable, mq), nil
 	default:
 
 	}
-	return -1
+	return -1, nil
 }
 
 func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
@@ -284,18 +291,24 @@ func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
 	})
 }
 
+// Deprecated: Use readWithException instead.
 func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+	result, _ := r.readWithException(mq, t)
+	return result
+}
+
+func (r *remoteBrokerOffsetStore) readWithException(mq *primitive.MessageQueue, t readType) (int64, error) {
 	r.mutex.RLock()
 	switch t {
 	case _ReadFromMemory, _ReadMemoryThenStore:
 		off, exist := r.OffsetTable[*mq]
 		if exist {
 			r.mutex.RUnlock()
-			return off
+			return off, nil
 		}
 		if t == _ReadFromMemory {
 			r.mutex.RUnlock()
-			return -1
+			return -1, nil
 		}
 		fallthrough
 	case _ReadFromStore:
@@ -307,7 +320,7 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i
 				rlog.LogKeyUnderlayError: err,
 			})
 			r.mutex.RUnlock()
-			return -1
+			return -1, err
 		}
 		rlog.Warning("fetch offset of mq from broker success", map[string]interface{}{
 			rlog.LogKeyConsumerGroup: r.group,
@@ -316,11 +329,11 @@ func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) i
 		})
 		r.mutex.RUnlock()
 		r.update(mq, off, true)
-		return off
+		return off, nil
 	default:
 	}
 
-	return -1
+	return -1, nil
 }
 
 func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index cfa0eaa..d833f36 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -98,7 +98,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
 				}
 				for _, value := range cases {
 					localStore.update(value.queue, value.setOffset, false)
-					offset := localStore.read(value.queue, _ReadFromMemory)
+					offset, _ := localStore.readWithException(value.queue, _ReadFromMemory)
 					So(offset, ShouldEqual, value.expectedOffset)
 				}
 			})
@@ -119,7 +119,7 @@ func TestLocalFileOffsetStore(t *testing.T) {
 				}
 				for _, value := range cases {
 					localStore.update(value.queue, value.setOffset, true)
-					offset := localStore.read(value.queue, _ReadFromMemory)
+					offset, _ := localStore.readWithException(value.queue, _ReadFromMemory)
 					So(offset, ShouldEqual, value.expectedOffset)
 				}
 			})
@@ -127,16 +127,16 @@ func TestLocalFileOffsetStore(t *testing.T) {
 
 		Convey("test persist", func() {
 			localStore.update(mq, 1, false)
-			offset := localStore.read(mq, _ReadFromMemory)
+			offset, _ := localStore.readWithException(mq, _ReadFromMemory)
 			So(offset, ShouldEqual, 1)
 
 			queues := []*primitive.MessageQueue{mq}
 			localStore.persist(queues)
-			offset = localStore.read(mq, _ReadFromStore)
+			offset, _ = localStore.readWithException(mq, _ReadFromStore)
 			So(offset, ShouldEqual, 1)
 
 			localStore.(*localFileOffsetStore).OffsetTable.Delete(MessageQueueKey(*mq))
-			offset = localStore.read(mq, _ReadMemoryThenStore)
+			offset, _ = localStore.readWithException(mq, _ReadMemoryThenStore)
 			So(offset, ShouldEqual, 1)
 		})
 	})
@@ -178,7 +178,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
 				}
 				for _, value := range cases {
 					remoteStore.update(value.queue, value.setOffset, false)
-					offset := remoteStore.read(value.queue, _ReadFromMemory)
+					offset, _ := remoteStore.readWithException(value.queue, _ReadFromMemory)
 					So(offset, ShouldEqual, value.expectedOffset)
 				}
 			})
@@ -199,7 +199,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
 				}
 				for _, value := range cases {
 					remoteStore.update(value.queue, value.setOffset, true)
-					offset := remoteStore.read(value.queue, _ReadFromMemory)
+					offset, _ := remoteStore.readWithException(value.queue, _ReadFromMemory)
 					So(offset, ShouldEqual, value.expectedOffset)
 				}
 			})
@@ -219,24 +219,24 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
 			rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil).MaxTimes(2)
 
 			remoteStore.persist(queues)
-			offset := remoteStore.read(mq, _ReadFromStore)
+			offset, _ := remoteStore.readWithException(mq, _ReadFromStore)
 			So(offset, ShouldEqual, 1)
 
 			remoteStore.remove(mq)
-			offset = remoteStore.read(mq, _ReadFromMemory)
+			offset, _ = remoteStore.readWithException(mq, _ReadFromMemory)
 			So(offset, ShouldEqual, -1)
-			offset = remoteStore.read(mq, _ReadMemoryThenStore)
+			offset, _ = remoteStore.readWithException(mq, _ReadMemoryThenStore)
 			So(offset, ShouldEqual, 1)
 
 		})
 
 		Convey("test remove", func() {
 			remoteStore.update(mq, 1, false)
-			offset := remoteStore.read(mq, _ReadFromMemory)
+			offset, _ := remoteStore.readWithException(mq, _ReadFromMemory)
 			So(offset, ShouldEqual, 1)
 
 			remoteStore.remove(mq)
-			offset = remoteStore.read(mq, _ReadFromMemory)
+			offset, _ = remoteStore.readWithException(mq, _ReadFromMemory)
 			So(offset, ShouldEqual, -1)
 		})
 	})
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index ff6f7d7..4699601 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -20,10 +20,11 @@ package consumer
 import (
 	"context"
 	"fmt"
-	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
 	"sync"
 	"sync/atomic"
 
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+
 	"github.com/pkg/errors"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
@@ -219,7 +220,8 @@ func (c *defaultPullConsumer) makeSureStateOK() error {
 }
 
 func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 {
-	return c.computePullFromWhere(queue)
+	result, _ := c.computePullFromWhereWithException(queue)
+	return result
 }
 
 // PullFrom pull messages of queue from the offset to offset + numbers
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 801f412..4ad5ee3 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,7 +20,6 @@ package consumer
 import (
 	"context"
 	"fmt"
-	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
 	"math"
 	"runtime/pprof"
 	"strconv"
@@ -29,6 +28,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+
 	"github.com/pkg/errors"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
@@ -375,7 +376,7 @@ func (pc *pushConsumer) GetConsumerRunningInfo(stack bool) *internal.ConsumerRun
 		mq := key.(primitive.MessageQueue)
 		pq := value.(*processQueue)
 		pInfo := pq.currentInfo()
-		pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
+		pInfo.CommitOffset, _ = pc.storage.readWithException(&mq, _ReadMemoryThenStore)
 		info.MQTable[mq] = pInfo
 		return true
 	})
@@ -644,7 +645,15 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 		} else {
 			if pq.IsLock() {
 				if !request.lockedFirst {
-					offset := pc.computePullFromWhere(request.mq)
+					offset, err := pc.computePullFromWhereWithException(request.mq)
+					if err != nil {
+						rlog.Warning("computePullFromWhere from broker error", map[string]interface{}{
+							rlog.LogKeyUnderlayError: err.Error(),
+						})
+						sleepTime = _PullDelayTimeWhenError
+						goto NEXT
+					}
+
 					brokerBusy := offset < request.nextOffset
 					rlog.Info("the first time to pull message, so fix offset from broker, offset maybe changed", map[string]interface{}{
 						rlog.LogKeyPullRequest:      request.String(),
@@ -684,7 +693,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 		)
 
 		if pc.model == Clustering {
-			commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
+			commitOffsetValue, _ = pc.storage.readWithException(request.mq, _ReadFromMemory)
 			if commitOffsetValue > 0 {
 				commitOffsetEnable = true
 			}