You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by we...@apache.org on 2021/12/27 06:08:05 UTC
[rocketmq-client-go] branch pull-consumer updated: [ISSUE #737] Add ManualPullConsumer (#745)
This is an automated email from the ASF dual-hosted git repository.
wenfeng pushed a commit to branch pull-consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/pull-consumer by this push:
new 29d0691 [ISSUE #737] Add ManualPullConsumer (#745)
29d0691 is described below
commit 29d06911f8d5a3aa14a0950776122cfd6fb28d48
Author: Gagharv <ww...@gmail.com>
AuthorDate: Mon Dec 27 14:07:56 2021 +0800
[ISSUE #737] Add ManualPullConsumer (#745)
* add manual consumer
* Make the namesrv in `NewManualPullConsumer` can reuse the definition of the upper
Change-Id: I4f2811988bc639554b3029a7fdec76509f3a8907
* add comments and adjust the code
Change-Id: Ie4bcd8cba6a07a3bff3dfb328ce38c157cbfca76
* adjust code
Change-Id: Ie03834b685882f0f40171192719124a4c1ea44ed
* adjust log output, etc.
Change-Id: I9cfda8364c1163f700c2f3e2ef673fd3b117f270
---
consumer/manual_pull_consumer.go | 342 +++++++++++++++++++++++++++++++++
consumer/manual_pull_consumer_test.go | 343 ++++++++++++++++++++++++++++++++++
examples/consumer/manual/main.go | 83 ++++++++
internal/request.go | 12 ++
primitive/interceptor.go | 4 +
5 files changed, 784 insertions(+)
diff --git a/consumer/manual_pull_consumer.go b/consumer/manual_pull_consumer.go
new file mode 100644
index 0000000..0abff56
--- /dev/null
+++ b/consumer/manual_pull_consumer.go
@@ -0,0 +1,342 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+ "sync"
+ "time"
+
+ errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+ "github.com/apache/rocketmq-client-go/v2/internal"
+ "github.com/apache/rocketmq-client-go/v2/internal/remote"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+ "github.com/pkg/errors"
+)
+
+// ManualPullConsumer is a low-level consumer, which operates based on MessageQueue.
+// Users should maintain information such as offset by themselves
+type ManualPullConsumer interface {
+ // PullFromQueue return messages according to specified queue with offset
+ PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+ // GetMessageQueues return queues of the topic
+ GetMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
+
+ // CommittedOffset return the offset of mq in groupName, if mq not exist, -1 will be return
+ CommittedOffset(ctx context.Context, groupName string, mq *primitive.MessageQueue) (int64, error)
+
+ // Seek let consume position to the offset, this api can be used to reset offset and commit offset
+ Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64) error
+
+ // Lookup return offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return.
+ // If timestamp less than any message's born time, the earliest offset will be returned
+ // If timestamp great than any message's born time, the latest offset will be returned
+ Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+
+ // Shutdown the ManualPullConsumer, clean up internal resources
+ Shutdown() error
+}
+
+type defaultManualPullConsumer struct {
+ namesrv internal.Namesrvs
+ option consumerOptions
+ client internal.RMQClient
+ interceptor primitive.Interceptor
+ pullFromWhichNodeTable sync.Map
+ shutdownOnce sync.Once
+}
+
+// NewManualPullConsumer creates and initializes a new ManualPullConsumer.
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+ defaultOpts := defaultPullConsumerOptions()
+ for _, apply := range options {
+ apply(&defaultOpts)
+ }
+
+ srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+ if err != nil {
+ return nil, errors.Wrap(err, "new Namesrv failed.")
+ }
+ if !defaultOpts.Credentials.IsEmpty() {
+ srvs.SetCredentials(defaultOpts.Credentials)
+ }
+ defaultOpts.Namesrv = srvs
+
+ actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+ actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+ dc := &defaultManualPullConsumer{
+ client: actualRMQClient,
+ option: defaultOpts,
+ namesrv: actualNameSrv,
+ }
+
+ dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+ dc.option.ClientOptions.Namesrv = actualNameSrv
+ return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+ if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+ return nil, err
+ }
+ subData := buildSubscriptionData(mq.Topic, MessageSelector{
+ Expression: _SubAll,
+ })
+
+ sysFlag := buildSysFlag(false, true, true, false)
+
+ pullRequest := &internal.PullMessageRequestHeader{
+ ConsumerGroup: groupName,
+ Topic: mq.Topic,
+ QueueId: int32(mq.QueueId),
+ QueueOffset: offset,
+ MaxMsgNums: int32(numbers),
+ SysFlag: sysFlag,
+ CommitOffset: 0,
+ SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+ SubExpression: subData.SubString,
+ ExpressionType: string(subData.ExpType),
+ }
+
+ if subData.ExpType == string(TAG) {
+ pullRequest.SubVersion = 0
+ } else {
+ pullRequest.SubVersion = subData.SubVersion
+ }
+
+ pullResp, err := dc.pullInner(ctx, mq, pullRequest)
+ if err != nil {
+ return pullResp, err
+ }
+ dc.processPullResult(mq, pullResp, subData)
+ if dc.interceptor != nil {
+ msgCtx := &primitive.ConsumeMessageContext{
+ Properties: make(map[string]string),
+ ConsumerGroup: groupName,
+ MQ: mq,
+ Msgs: pullResp.GetMessageExts(),
+ }
+ err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+ }
+ return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) {
+ return dc.namesrv.FetchSubscribeMessageQueues(topic)
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(ctx context.Context, groupName string, mq *primitive.MessageQueue) (int64, error) {
+ fn := func(broker string) (*remote.RemotingCommand, error) {
+ request := &internal.QueryConsumerOffsetRequestHeader{
+ ConsumerGroup: groupName,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ }
+ cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, request, nil)
+ return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+ }
+ return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64) error {
+ minOffset, err := dc.queryMinOffset(context.Background(), mq)
+ if err != nil {
+ return err
+ }
+ maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+ if err != nil {
+ return err
+ }
+ if offset < minOffset || offset > maxOffset {
+ return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+ }
+
+ broker, exist := dc.chooseServer(mq)
+ if !exist {
+ rlog.Warning("the broker does not exist", map[string]interface{}{
+ rlog.LogKeyBroker: mq.BrokerName,
+ })
+ return errors2.ErrBrokerNotFound
+ }
+
+ updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+ ConsumerGroup: groupName,
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ CommitOffset: offset,
+ }
+ cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+ return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+ fn := func(broker string) (*remote.RemotingCommand, error) {
+ request := &internal.SearchOffsetRequestHeader{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ Timestamp: timestamp,
+ }
+ cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+ return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+ }
+ return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) Shutdown() error {
+ dc.shutdownOnce.Do(func() {
+ dc.client.Shutdown()
+ })
+ return nil
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+ brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+ if brokerAddr == "" {
+ dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+ brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+ }
+ return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+ fn := func(broker string) (*remote.RemotingCommand, error) {
+ request := &internal.GetMinOffsetRequestHeader{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ }
+ cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+ return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+ }
+ return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+ fn := func(broker string) (*remote.RemotingCommand, error) {
+ request := &internal.GetMaxOffsetRequestHeader{
+ Topic: mq.Topic,
+ QueueId: mq.QueueId,
+ }
+ cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+ return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+ }
+ return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) processQueryOffset(mq *primitive.MessageQueue, fn func(broker string) (*remote.RemotingCommand, error)) (int64, error) {
+ broker, exist := dc.chooseServer(mq)
+ if !exist {
+ rlog.Warning("the broker does not exist", map[string]interface{}{
+ rlog.LogKeyBroker: mq.BrokerName,
+ })
+ return -1, errors2.ErrBrokerNotFound
+ }
+ response, err := fn(broker)
+ if err != nil {
+ return -1, err
+ }
+ if response.Code != internal.ResSuccess {
+ return -2, fmt.Errorf("broker response code: %d, remarks: %s", response.Code, response.Remark)
+ }
+ off, err := strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+ if err != nil {
+ return -1, errors.Wrap(err, "parse offset fail.")
+ }
+ return off, nil
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, mq *primitive.MessageQueue, pullRequest *internal.PullMessageRequestHeader) (*primitive.PullResult, error) {
+ brokerResult := dc.tryFindBroker(mq)
+ if brokerResult == nil {
+ rlog.Warning("no broker found for mq", map[string]interface{}{
+ rlog.LogKeyMessageQueue: mq,
+ })
+ return nil, errors2.ErrBrokerNotFound
+ }
+
+ if (pullRequest.ExpressionType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0 {
+ return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+ mq.BrokerName, brokerResult.BrokerVersion, pullRequest.ExpressionType)
+ }
+ return dc.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultManualPullConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
+ result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+ if result != nil {
+ return result
+ }
+ dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+ return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+}
+func (dc *defaultManualPullConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+ v, exist := dc.pullFromWhichNodeTable.Load(*mq)
+ if exist {
+ return v.(int64)
+ }
+ return internal.MasterId
+}
+
+func (dc *defaultManualPullConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+ if mq == nil {
+ return errors2.ErrMQEmpty
+ }
+ if offset < 0 {
+ return errors2.ErrOffset
+ }
+ if numbers <= 0 {
+ return errors2.ErrNumbers
+ }
+ return nil
+}
+
+func (dc *defaultManualPullConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
+
+ dc.pullFromWhichNodeTable.Store(*mq, result.SuggestWhichBrokerId)
+
+ switch result.Status {
+ case primitive.PullFound:
+ result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+ msgs := result.GetMessageExts()
+ // filter message according to tags
+ msgListFilterAgain := msgs
+ if data.Tags.Len() > 0 && data.ClassFilterMode {
+ msgListFilterAgain = make([]*primitive.MessageExt, 0)
+ for _, msg := range msgs {
+ _, exist := data.Tags.Contains(msg.GetTags())
+ if exist {
+ msgListFilterAgain = append(msgListFilterAgain, msg)
+ }
+ }
+ }
+ // TODO: add filter message hook
+ for _, msg := range msgListFilterAgain {
+ traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
+ if traFlag {
+ msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
+ }
+ msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
+ msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
+ }
+ result.SetMessageExts(msgListFilterAgain)
+ }
+}
diff --git a/consumer/manual_pull_consumer_test.go b/consumer/manual_pull_consumer_test.go
new file mode 100644
index 0000000..d04d522
--- /dev/null
+++ b/consumer/manual_pull_consumer_test.go
@@ -0,0 +1,343 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+ "context"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2/internal"
+ "github.com/apache/rocketmq-client-go/v2/internal/remote"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/golang/mock/gomock"
+ . "github.com/smartystreets/goconvey/convey"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestPullFromQueue(t *testing.T) {
+ Convey("ManualPullConsumer PullFromQueue", t, func() {
+ serverTopic := "foo"
+ tests := map[string]struct {
+ mq primitive.MessageQueue
+ offset int64
+ numbers int
+ expectedErr bool
+ }{
+ "topic exist": {
+ primitive.MessageQueue{
+ Topic: "foo",
+ BrokerName: "",
+ QueueId: 0,
+ },
+ 1,
+ 1,
+ false,
+ },
+ "topic not exist": {
+ primitive.MessageQueue{
+ Topic: "foo2",
+ BrokerName: "",
+ QueueId: 0,
+ },
+ 1,
+ 1,
+ false,
+ },
+ }
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+ if err != nil {
+ assert.Error(t, err)
+ }
+
+ namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+ c.namesrv, c.client = namesrv, client
+ namesrv.EXPECT().FindBrokerAddressInSubscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(
+ &internal.FindBrokerResult{
+ BrokerAddr: "foo",
+ Slave: false,
+ BrokerVersion: 1.0,
+ },
+ )
+ client.EXPECT().PullMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, brokerAddrs string, request *internal.PullMessageRequestHeader) (*primitive.PullResult, error) {
+ pullResult := &primitive.PullResult{
+ SuggestWhichBrokerId: 0,
+ }
+ if request.Topic == serverTopic {
+ pullResult.Status = primitive.PullFound
+ } else {
+ pullResult.Status = primitive.PullNoNewMsg
+ }
+ return pullResult, nil
+ })
+
+ for name, test := range tests {
+ Convey(name, func() {
+ _, err := c.PullFromQueue(context.Background(), "default", &test.mq, test.offset, test.numbers)
+ So(err, ShouldBeNil)
+ })
+ }
+
+ })
+}
+
+func TestGetMessageQueues(t *testing.T) {
+ Convey("ManualPullConsumer GetMessageQueues", t, func() {
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+ if err != nil {
+ assert.Error(t, err)
+ }
+
+ namesrv := internal.NewMockNamesrvs(ctrl)
+ c.namesrv = namesrv
+ namesrv.EXPECT().FetchSubscribeMessageQueues("foo").Return(
+ []*primitive.MessageQueue{
+ {Topic: "foo", BrokerName: "foo", QueueId: 0},
+ }, nil,
+ )
+
+ queues, err := c.GetMessageQueues(context.TODO(), "foo")
+
+ So(queues, ShouldNotBeEmpty)
+ So(err, ShouldBeNil)
+ })
+}
+
+func TestCommittedOffset(t *testing.T) {
+ Convey("ManualPullConsumer CommittedOffset", t, func() {
+
+ serverTopic, serverOffset := "foo", "1"
+ tests := map[string]struct {
+ mq primitive.MessageQueue
+ except int
+ expectedErr bool
+ }{
+ "topic exist": {
+ primitive.MessageQueue{
+ Topic: "foo",
+ BrokerName: "",
+ QueueId: 0,
+ },
+ 1,
+ false,
+ },
+ "topic not exist": {
+ primitive.MessageQueue{
+ Topic: "foo2",
+ BrokerName: "",
+ QueueId: 0,
+ },
+ -2,
+ true,
+ },
+ }
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+ if err != nil {
+ assert.Error(t, err)
+ }
+ namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+ c.namesrv, c.client = namesrv, client
+
+ namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+ if request.ExtFields["topic"] == serverTopic {
+ ret := &remote.RemotingCommand{
+ Code: internal.ResSuccess,
+ ExtFields: map[string]string{
+ "offset": serverOffset,
+ },
+ }
+ return ret, nil
+ }
+ ret := &remote.RemotingCommand{
+ Code: internal.ResTopicNotExist,
+ }
+ return ret, nil
+ }).AnyTimes()
+
+ for name, test := range tests {
+ Convey(name, func() {
+ ret, err := c.CommittedOffset(context.Background(), "foo", &test.mq)
+
+ if test.expectedErr {
+ So(err, ShouldNotBeNil)
+ } else {
+ So(err, ShouldBeNil)
+ }
+ So(ret, ShouldEqual, test.except)
+ })
+ }
+ })
+}
+
+func TestSeek(t *testing.T) {
+ Convey("ManualPullConsumer Seek", t, func() {
+
+ serverMinOffset, serverMaxOffset := "1", "10"
+ tests := map[string]struct {
+ mq primitive.MessageQueue
+ offset int64
+ expectedErr bool
+ }{
+ "normal offset": {
+ primitive.MessageQueue{
+ Topic: "foo",
+ BrokerName: "foo",
+ QueueId: 0,
+ },
+ 3,
+ false,
+ },
+ "illegal offset": {
+ primitive.MessageQueue{
+ Topic: "foo",
+ BrokerName: "foo",
+ QueueId: 0,
+ },
+ 11,
+ true,
+ },
+ }
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+ if err != nil {
+ assert.Error(t, err)
+ }
+ namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+ c.namesrv, c.client = namesrv, client
+
+ namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+ if request.Code == internal.ReqGetMinOffset {
+ return &remote.RemotingCommand{
+ Code: internal.ResSuccess,
+ ExtFields: map[string]string{
+ "offset": serverMinOffset,
+ },
+ }, nil
+ } else if request.Code == internal.ReqGetMaxOffset {
+ return &remote.RemotingCommand{
+ Code: internal.ResSuccess,
+ ExtFields: map[string]string{
+ "offset": serverMaxOffset,
+ },
+ }, nil
+ }
+ return &remote.RemotingCommand{
+ Code: internal.ResError,
+ }, nil
+ }).AnyTimes()
+
+ client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+ for name, test := range tests {
+ Convey(name, func() {
+ err := c.Seek(context.Background(), "foo", &test.mq, test.offset)
+ if test.expectedErr {
+ So(err, ShouldNotBeNil)
+ } else {
+ So(err, ShouldBeNil)
+ }
+ })
+ }
+ })
+}
+
+func TestLookup(t *testing.T) {
+ Convey("ManualPullConsumer Lookup", t, func() {
+ test := struct {
+ mq primitive.MessageQueue
+ timestamp int64
+ offset int64
+ expectedErr bool
+ }{
+ primitive.MessageQueue{
+ Topic: "foo",
+ BrokerName: "foo",
+ QueueId: 0,
+ },
+ int64(time.Now().Nanosecond()),
+ 10,
+ true,
+ }
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+ if err != nil {
+ assert.Error(t, err)
+ }
+ namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+ c.namesrv, c.client = namesrv, client
+
+ namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+
+ return &remote.RemotingCommand{
+ Code: internal.ResSuccess,
+ ExtFields: map[string]string{
+ "offset": strconv.FormatInt(test.offset, 10),
+ },
+ }, nil
+ }).AnyTimes()
+
+ ret, err := c.Lookup(context.Background(), &test.mq, test.timestamp)
+ So(err, ShouldBeNil)
+ So(ret, ShouldEqual, test.offset)
+ })
+}
+
+func TestShutdown(t *testing.T) {
+ Convey("ManualPullConsumer Shutdown", t, func() {
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+ if err != nil {
+ assert.Error(t, err)
+ }
+ client := internal.NewMockRMQClient(ctrl)
+ c.client = client
+
+ client.EXPECT().Shutdown().Times(1)
+ c.Shutdown()
+ c.Shutdown()
+ })
+}
diff --git a/examples/consumer/manual/main.go b/examples/consumer/manual/main.go
new file mode 100644
index 0000000..1bdf479
--- /dev/null
+++ b/examples/consumer/manual/main.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+func main() {
+ groupName := "testGroup"
+ c, err := consumer.NewManualPullConsumer(
+ consumer.WithGroupName(groupName),
+ consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+ )
+ if err != nil {
+ rlog.Fatal(fmt.Sprintf("init producer error: %v", err), nil)
+ }
+
+ topic := "test"
+ // get all message queue
+ mqs, err := c.GetMessageQueues(context.Background(), topic)
+ if err != nil {
+ rlog.Fatal(fmt.Sprintf("get message queue error: %v", err), nil)
+ }
+ var wg sync.WaitGroup
+
+ fn := func(mq *primitive.MessageQueue) {
+ defer wg.Done()
+ // get latest offset
+ offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+ if err != nil {
+ rlog.Fatal(fmt.Sprintf("search consumer offset error: %v", err), nil)
+ }
+ for {
+ // pull message
+ ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+ if err != nil {
+ rlog.Fatal(fmt.Sprintf("pullFromQueue error: %v", err), nil)
+ }
+ if ret.Status == primitive.PullFound {
+ msgs := ret.GetMessageExts()
+ for _, msg := range msgs {
+ fmt.Printf("subscribe Msg: %v \n", msg)
+ // commit offset
+ if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {
+ rlog.Fatal(fmt.Sprintf("commit offset error: %v", err), nil)
+ }
+ offset++
+ }
+ } else {
+ break
+ }
+ }
+ }
+
+ for _, mq := range mqs {
+ wg.Add(1)
+ go fn(mq)
+ }
+ wg.Wait()
+ c.Shutdown()
+}
diff --git a/internal/request.go b/internal/request.go
index 0e3d8e1..f53d5ff 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -236,6 +236,18 @@ func (request *GetConsumerListRequestHeader) Encode() map[string]string {
return maps
}
+type GetMinOffsetRequestHeader struct {
+ Topic string
+ QueueId int
+}
+
+func (request *GetMinOffsetRequestHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+ maps["queueId"] = strconv.Itoa(request.QueueId)
+ return maps
+}
+
type GetMaxOffsetRequestHeader struct {
Topic string
QueueId int
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
index 878aab5..51716aa 100644
--- a/primitive/interceptor.go
+++ b/primitive/interceptor.go
@@ -29,6 +29,10 @@ type Invoker func(ctx context.Context, req, reply interface{}) error
// use type assert to get real type.
type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error
+var NoopInterceptor = func(ctx context.Context, req, reply interface{}) error {
+ return nil
+}
+
func ChainInterceptors(interceptors ...Interceptor) Interceptor {
if len(interceptors) == 0 {
return nil