You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/26 02:51:17 UTC
[rocketmq-client-go] branch native updated: add ut for
defaultConsumer. resolve #178 (#179)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 59cd47f add ut for defaultConsumer. resolve #178 (#179)
59cd47f is described below
commit 59cd47fe743ff7eff4b6f110ea5fbe525988b33c
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Aug 26 10:51:12 2019 +0800
add ut for defaultConsumer. resolve #178 (#179)
---
consumer/consumer_test.go | 138 ++++++++++++++++++++++++++++++++++++++++++
consumer/mock_offset_store.go | 76 +++++++++++++++++++++++
consumer/offset_store.go | 1 +
3 files changed, 215 insertions(+)
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index a837b24..665547d 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -18,10 +18,17 @@ limitations under the License.
package consumer
import (
+ "sync"
"testing"
"time"
+ "github.com/golang/mock/gomock"
+ . "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
+
+ "github.com/apache/rocketmq-client-go/internal"
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
)
func TestParseTimestamp(t *testing.T) {
@@ -30,3 +37,134 @@ func TestParseTimestamp(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(1556652849), timestamp.Unix())
}
+
+func TestDoRebalance(t *testing.T) {
+ Convey("Given a defaultConsumer", t, func() {
+ dc := &defaultConsumer{
+ model: Clustering,
+ }
+
+ topic := "test"
+ broker := "127.0.0.1:8889"
+ clientID := "clientID"
+ mqs := []*primitive.MessageQueue{
+ {
+ Topic: topic,
+ BrokerName: "",
+ QueueId: 0,
+ },
+ {
+ Topic: topic,
+ BrokerName: "",
+ QueueId: 1,
+ },
+ }
+ dc.topicSubscribeInfoTable.Store(topic, mqs)
+ sub := &internal.SubscriptionData{}
+ dc.subscriptionDataTable.Store(topic, sub)
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ namesrvCli := internal.NewMockNamesrvs(ctrl)
+ namesrvCli.EXPECT().FindBrokerAddrByTopic(gomock.Any()).Return(broker)
+ dc.namesrv = namesrvCli
+
+ rmqCli := internal.NewMockRMQClient(ctrl)
+ rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+ Return(&remote.RemotingCommand{
+ Body: []byte("{\"consumerIdList\": [\"a1\", \"a2\", \"a3\"] }"),
+ }, nil)
+ rmqCli.EXPECT().ClientID().Return(clientID)
+ dc.client = rmqCli
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ dc.allocate = func(cg string, clientID string, mqAll []*primitive.MessageQueue, cidAll []string) []*primitive.MessageQueue {
+ assert.Equal(t, cidAll, []string{"a1", "a2", "a3"})
+ wg.Done()
+ return nil
+ }
+
+ dc.doBalance()
+
+ wg.Wait()
+ })
+}
+
+func TestComputePullFromWhere(t *testing.T) {
+ Convey("Given a defaultConsumer", t, func() {
+ dc := &defaultConsumer{
+ model: Clustering,
+ cType: _PushConsume,
+ }
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ offsetStore := NewMockOffsetStore(ctrl)
+ dc.storage = offsetStore
+
+ mq := &primitive.MessageQueue{
+ Topic: "test",
+ }
+
+ namesrvCli := internal.NewMockNamesrvs(ctrl)
+ dc.namesrv = namesrvCli
+
+ rmqCli := internal.NewMockRMQClient(ctrl)
+ dc.client = rmqCli
+
+ Convey("get effective offset", func() {
+ offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(10))
+ res := dc.computePullFromWhere(mq)
+ assert.Equal(t, int64(10), res)
+ })
+
+ Convey("ConsumeFromLastOffset for normal topic", func() {
+ offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+ dc.option.FromWhere = ConsumeFromLastOffset
+
+ broker := "a"
+ namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
+
+ rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+ Return(&remote.RemotingCommand{
+ ExtFields: map[string]string{
+ "offset": "20",
+ },
+ }, nil)
+
+ res := dc.computePullFromWhere(mq)
+ assert.Equal(t, int64(20), res)
+ })
+
+ Convey("ConsumeFromFirstOffset for normal topic", func() {
+ offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+ dc.option.FromWhere = ConsumeFromFirstOffset
+
+ res := dc.computePullFromWhere(mq)
+ assert.Equal(t, int64(0), res)
+ })
+
+ Convey("ConsumeFromTimestamp for normal topic", func() {
+ offsetStore.EXPECT().read(gomock.Any(), gomock.Any()).Return(int64(-1))
+ dc.option.FromWhere = ConsumeFromTimestamp
+
+ dc.option.ConsumeTimestamp = "20060102150405"
+
+ broker := "a"
+ namesrvCli.EXPECT().FindBrokerAddrByName(gomock.Any()).Return(broker)
+
+ rmqCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).
+ Return(&remote.RemotingCommand{
+ ExtFields: map[string]string{
+ "offset": "30",
+ },
+ }, nil)
+
+ res := dc.computePullFromWhere(mq)
+ assert.Equal(t, int64(30), res)
+ })
+
+ })
+}
diff --git a/consumer/mock_offset_store.go b/consumer/mock_offset_store.go
new file mode 100644
index 0000000..093c50d
--- /dev/null
+++ b/consumer/mock_offset_store.go
@@ -0,0 +1,76 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: offset_store.go
+
+// Package consumer is a generated GoMock package.
+package consumer
+
+import (
+ primitive "github.com/apache/rocketmq-client-go/primitive"
+ gomock "github.com/golang/mock/gomock"
+ reflect "reflect"
+)
+
+// MockOffsetStore is a mock of OffsetStore interface
+type MockOffsetStore struct {
+ ctrl *gomock.Controller
+ recorder *MockOffsetStoreMockRecorder
+}
+
+// MockOffsetStoreMockRecorder is the mock recorder for MockOffsetStore
+type MockOffsetStoreMockRecorder struct {
+ mock *MockOffsetStore
+}
+
+// NewMockOffsetStore creates a new mock instance
+func NewMockOffsetStore(ctrl *gomock.Controller) *MockOffsetStore {
+ mock := &MockOffsetStore{ctrl: ctrl}
+ mock.recorder = &MockOffsetStoreMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockOffsetStore) EXPECT() *MockOffsetStoreMockRecorder {
+ return m.recorder
+}
+
+// persist mocks base method
+func (m *MockOffsetStore) persist(mqs []*primitive.MessageQueue) {
+ m.ctrl.Call(m, "persist", mqs)
+}
+
+// persist indicates an expected call of persist
+func (mr *MockOffsetStoreMockRecorder) persist(mqs interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "persist", reflect.TypeOf((*MockOffsetStore)(nil).persist), mqs)
+}
+
+// remove mocks base method
+func (m *MockOffsetStore) remove(mq *primitive.MessageQueue) {
+ m.ctrl.Call(m, "remove", mq)
+}
+
+// remove indicates an expected call of remove
+func (mr *MockOffsetStoreMockRecorder) remove(mq interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "remove", reflect.TypeOf((*MockOffsetStore)(nil).remove), mq)
+}
+
+// read mocks base method
+func (m *MockOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
+ ret := m.ctrl.Call(m, "read", mq, t)
+ ret0, _ := ret[0].(int64)
+ return ret0
+}
+
+// read indicates an expected call of read
+func (mr *MockOffsetStoreMockRecorder) read(mq, t interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "read", reflect.TypeOf((*MockOffsetStore)(nil).read), mq, t)
+}
+
+// update mocks base method
+func (m *MockOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
+ m.ctrl.Call(m, "update", mq, offset, increaseOnly)
+}
+
+// update indicates an expected call of update
+func (mr *MockOffsetStoreMockRecorder) update(mq, offset, increaseOnly interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "update", reflect.TypeOf((*MockOffsetStore)(nil).update), mq, offset, increaseOnly)
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index f799c1a..4d1b7e9 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -51,6 +51,7 @@ func init() {
}
}
+//go:generate mockgen -source offset_store.go -destination mock_offset_store.go -self_package github.com/apache/rocketmq-client-go/consumer --package consumer OffsetStore
type OffsetStore interface {
persist(mqs []*primitive.MessageQueue)
remove(mq *primitive.MessageQueue)