You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/26 02:51:17 UTC

[rocketmq-client-go] branch native updated: add ut for defaultConsumer. resolve #178 (#179)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 59cd47f  add ut for defaultConsumer. resolve #178 (#179)
59cd47f is described below

commit 59cd47fe743ff7eff4b6f110ea5fbe525988b33c
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Aug 26 10:51:12 2019 +0800

    add ut for defaultConsumer. resolve #178 (#179)
---
 consumer/consumer_test.go     | 138 ++++++++++++++++++++++++++++++++++++++++++
 consumer/mock_offset_store.go |  76 +++++++++++++++++++++++
 consumer/offset_store.go      |   1 +
 3 files changed, 215 insertions(+)

diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index a837b24..665547d 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -18,10 +18,17 @@ limitations under the License.
 package consumer
 
 import (
+	"sync"
 	"testing"
 	"time"
 
+	"github.com/golang/mock/gomock"
+	. "github.com/smartystreets/goconvey/convey"
 	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/rocketmq-client-go/internal"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 )
 
 func TestParseTimestamp(t *testing.T) {
@@ -30,3 +37,134 @@ func TestParseTimestamp(t *testing.T) {
 	assert.Nil(t, err)
 	assert.Equal(t, int64(1556652849), timestamp.Unix())
 }
+
+func TestDoRebalance(t *testing.T) {
+	Convey("Given a defaultConsumer", t, func() {
+		dc := &defaultConsumer{
+			model: Clustering,
+		}
+
+		topic := "test"
+		broker := "127.0.0.1:8889"
+		clientID := "clientID"
+		mqs := []*primitive.MessageQueue{
+			{
+				Topic:      topic,
+				BrokerName: "",
+				QueueId:    0,
+			},
+			{
+				Topic:      topic,
+				BrokerName: "",
+				QueueId:    1,
+			},
+		}
+		dc.topicSubscribeInfoTable.Store(topic, mqs)
+		sub := &internal.SubscriptionData{}
+		dc.subscriptionDataTable.Store(topic, sub)
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+		namesrvCli := internal.NewMockNamesrvs(ctrl)
+		namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker)
+		dc.namesrv = namesrvCli
+
+		rmqCli := internal.NewMockRMQClient(ctrl)
+		rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+			Return(&remote.RemotingCommand{
+				Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
+			}, nil)
+		rmqCli.EXPECT().ClientID().Return(clientID)
+		dc.client = rmqCli
+
+		var wg sync.WaitGroup
+		wg.Add(1)
+		dc.allocate = func(cg string, clientID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
+			assert.Equal(t, cidAll, []string{"a1", "a2", "a3"})
+			wg.Done()
+			return nil
+		}
+
+		dc.doBalance()
+
+		wg.Wait()
+	})
+}
+
+func TestComputePullFromWhere(t *testing.T) {
+	Convey("Given a defaultConsumer", t, func() {
+		dc := &defaultConsumer{
+			model: Clustering,
+			cType: _PushConsume,
+		}
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		offsetStore := NewMockOffsetStore(ctrl)
+		dc.storage = offsetStore
+
+		mq := &primitive.MessageQueue{
+			Topic: "test",
+		}
+
+		namesrvCli := internal.NewMockNamesrvs(ctrl)
+		dc.namesrv = namesrvCli
+
+		rmqCli := internal.NewMockRMQClient(ctrl)
+		dc.client = rmqCli
+
+		Convey("get effective offset", func() {
+			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
+			res := dc.computePullFromWhere(mq)
+			assert.Equal(t, int64(10), res)
+		})
+
+		Convey("ConsumeFromLastOffset for normal topic", func() {
+			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+			dc.option.FromWhere = ConsumeFromLastOffset
+
+			broker := "a"
+			namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
+
+			rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+				Return(&remote.RemotingCommand{
+					ExtFields: map[string]string{
+						"offset": "20",
+					},
+				}, nil)
+
+			res := dc.computePullFromWhere(mq)
+			assert.Equal(t, int64(20), res)
+		})
+
+		Convey("ConsumeFromFirstOffset for normal topic", func() {
+			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+			dc.option.FromWhere = ConsumeFromFirstOffset
+
+			res := dc.computePullFromWhere(mq)
+			assert.Equal(t, int64(0), res)
+		})
+
+		Convey("ConsumeFromTimestamp for normal topic", func() {
+			offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+			dc.option.FromWhere = ConsumeFromTimestamp
+
+			dc.option.ConsumeTimestamp = "20060102150405"
+
+			broker := "a"
+			namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
+
+			rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+				Return(&remote.RemotingCommand{
+					ExtFields: map[string]string{
+						"offset": "30",
+					},
+				}, nil)
+
+			res := dc.computePullFromWhere(mq)
+			assert.Equal(t, int64(30), res)
+		})
+
+	})
+}
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
new file mode 100644
index 0000000..093c50d
--- /dev/null
+++ b/consumer/mock_offset_store.go
@@ -0,0 +1,76 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: offset_store.go
+
+// Package consumer is a generated GoMock package.
+package consumer
+
+import (
+	primitive "github.com/apache/rocketmq-client-go/primitive"
+	gomock "github.com/golang/mock/gomock"
+	reflect "reflect"
+)
+
+// MockOffsetStore is a mock of OffsetStore interface
+type MockOffsetStore struct {
+	ctrl     *gomock.Controller
+	recorder *MockOffsetStoreMockRecorder
+}
+
+// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
+type MockOffsetStoreMockRecorder struct {
+	mock *MockOffsetStore
+}
+
+// NewMockOffsetStore creates a new mock instance
+func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore {
+	mock := &MockOffsetStore{ctrl: ctrl}
+	mock.recorder = &MockOffsetStoreMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder {
+	return m.recorder
+}
+
+// persist mocks base method
+func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) {
+	m.ctrl.Call(m, "persist", mqs)
+}
+
+// persist indicates an expected call of persist
+func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs)
+}
+
+// remove mocks base method
+func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
+	m.ctrl.Call(m, "remove", mq)
+}
+
+// remove indicates an expected call of remove
+func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
+}
+
+// read mocks base method
+func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+	ret := m.ctrl.Call(m, "read", mq, t)
+	ret0, _ := ret[0].(int64)
+	return ret0
+}
+
+// read indicates an expected call of read
+func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t)
+}
+
+// update mocks base method
+func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
+	m.ctrl.Call(m, "update", mq, offset, increaseOnly)
+}
+
+// update indicates an expected call of update
+func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index f799c1a..4d1b7e9 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -51,6 +51,7 @@ func init() {
 	}
 }
 
+//go:generate mockgen -source offset_store.go -destination mock_offset_store.go -self_package github.com/apache/rocketmq-client-go/consumer  --package consumer OffsetStore
 type OffsetStore interface {
 	persist(mqs []*primitive.MessageQueue)
 	remove(mq *primitive.MessageQueue)