You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by we...@apache.org on 2021/12/27 06:08:05 UTC

[rocketmq-client-go] branch pull-consumer updated: [ISSUE #737] Add ManualPullConsumer (#745)

This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch pull-consumer
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/pull-consumer by this push:
     new 29d0691  [ISSUE #737] Add ManualPullConsumer (#745)
29d0691 is described below

commit 29d06911f8d5a3aa14a0950776122cfd6fb28d48
Author: Gagharv <ww...@gmail.com>
AuthorDate: Mon Dec 27 14:07:56 2021 +0800

    [ISSUE #737] Add ManualPullConsumer (#745)
    
    * add manual consumer
    
    * Make  the namesrv in `NewManualPullConsumer` can reuse the definition of the upper
    
    Change-Id: I4f2811988bc639554b3029a7fdec76509f3a8907
    
    * add comments and adjust the code
    
    Change-Id: Ie4bcd8cba6a07a3bff3dfb328ce38c157cbfca76
    
    * adjust code
    
    Change-Id: Ie03834b685882f0f40171192719124a4c1ea44ed
    
    * adjust log output, etc.
    
    Change-Id: I9cfda8364c1163f700c2f3e2ef673fd3b117f270
---
 consumer/manual_pull_consumer.go      | 342 +++++++++++++++++++++++++++++++++
 consumer/manual_pull_consumer_test.go | 343 ++++++++++++++++++++++++++++++++++
 examples/consumer/manual/main.go      |  83 ++++++++
 internal/request.go                   |  12 ++
 primitive/interceptor.go              |   4 +
 5 files changed, 784 insertions(+)

diff --git a/consumer/manual_pull_consumer.go b/consumer/manual_pull_consumer.go
new file mode 100644
index 0000000..0abff56
--- /dev/null
+++ b/consumer/manual_pull_consumer.go
@@ -0,0 +1,342 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+// ManualPullConsumer is a low-level consumer, which operates based on MessageQueue.
+// Users should maintain information such as offset by themselves
+type ManualPullConsumer interface {
+	// PullFromQueue return messages according to specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// GetMessageQueues return queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
+
+	// CommittedOffset return the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(ctx context.Context, groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// Seek let consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// Lookup return offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return.
+	// If timestamp less than any message's born time, the earliest offset will be returned
+	// If timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+
+	// Shutdown the ManualPullConsumer, clean up internal resources
+	Shutdown() error
+}
+
+type defaultManualPullConsumer struct {
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+	shutdownOnce           sync.Once
+}
+
+// NewManualPullConsumer creates and initializes a new ManualPullConsumer.
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullRequest := &internal.PullMessageRequestHeader{
+		ConsumerGroup:        groupName,
+		Topic:                mq.Topic,
+		QueueId:              int32(mq.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         0,
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        subData.SubString,
+		ExpressionType:       string(subData.ExpType),
+	}
+
+	if subData.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = subData.SubVersion
+	}
+
+	pullResp, err := dc.pullInner(ctx, mq, pullRequest)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: groupName,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) {
+	return dc.namesrv.FetchSubscribeMessageQueues(topic)
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(ctx context.Context, groupName string, mq *primitive.MessageQueue) (int64, error) {
+	fn := func(broker string) (*remote.RemotingCommand, error) {
+		request := &internal.QueryConsumerOffsetRequestHeader{
+			ConsumerGroup: groupName,
+			Topic:         mq.Topic,
+			QueueId:       mq.QueueId,
+		}
+		cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, request, nil)
+		return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	}
+	return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		rlog.Warning("the broker does not exist", map[string]interface{}{
+			rlog.LogKeyBroker: mq.BrokerName,
+		})
+		return errors2.ErrBrokerNotFound
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	fn := func(broker string) (*remote.RemotingCommand, error) {
+		request := &internal.SearchOffsetRequestHeader{
+			Topic:     mq.Topic,
+			QueueId:   mq.QueueId,
+			Timestamp: timestamp,
+		}
+		cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+		return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	}
+	return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) Shutdown() error {
+	dc.shutdownOnce.Do(func() {
+		dc.client.Shutdown()
+	})
+	return nil
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	fn := func(broker string) (*remote.RemotingCommand, error) {
+		request := &internal.GetMinOffsetRequestHeader{
+			Topic:   mq.Topic,
+			QueueId: mq.QueueId,
+		}
+		cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+		return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	}
+	return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	fn := func(broker string) (*remote.RemotingCommand, error) {
+		request := &internal.GetMaxOffsetRequestHeader{
+			Topic:   mq.Topic,
+			QueueId: mq.QueueId,
+		}
+		cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+		return dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	}
+	return dc.processQueryOffset(mq, fn)
+}
+
+func (dc *defaultManualPullConsumer) processQueryOffset(mq *primitive.MessageQueue, fn func(broker string) (*remote.RemotingCommand, error)) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		rlog.Warning("the broker does not exist", map[string]interface{}{
+			rlog.LogKeyBroker: mq.BrokerName,
+		})
+		return -1, errors2.ErrBrokerNotFound
+	}
+	response, err := fn(broker)
+	if err != nil {
+		return -1, err
+	}
+	if response.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", response.Code, response.Remark)
+	}
+	off, err := strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, errors.Wrap(err, "parse offset fail.")
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, mq *primitive.MessageQueue, pullRequest *internal.PullMessageRequestHeader) (*primitive.PullResult, error) {
+	brokerResult := dc.tryFindBroker(mq)
+	if brokerResult == nil {
+		rlog.Warning("no broker found for mq", map[string]interface{}{
+			rlog.LogKeyMessageQueue: mq,
+		})
+		return nil, errors2.ErrBrokerNotFound
+	}
+
+	if (pullRequest.ExpressionType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0 {
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+			mq.BrokerName, brokerResult.BrokerVersion, pullRequest.ExpressionType)
+	}
+	return dc.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultManualPullConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
+	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+	if result != nil {
+		return result
+	}
+	dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+}
+func (dc *defaultManualPullConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+	v, exist := dc.pullFromWhichNodeTable.Load(*mq)
+	if exist {
+		return v.(int64)
+	}
+	return internal.MasterId
+}
+
+func (dc *defaultManualPullConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+	if mq == nil {
+		return errors2.ErrMQEmpty
+	}
+	if offset < 0 {
+		return errors2.ErrOffset
+	}
+	if numbers <= 0 {
+		return errors2.ErrNumbers
+	}
+	return nil
+}
+
+func (dc *defaultManualPullConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
+
+	dc.pullFromWhichNodeTable.Store(*mq, result.SuggestWhichBrokerId)
+
+	switch result.Status {
+	case primitive.PullFound:
+		result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+		msgs := result.GetMessageExts()
+		// filter message according to tags
+		msgListFilterAgain := msgs
+		if data.Tags.Len() > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*primitive.MessageExt, 0)
+			for _, msg := range msgs {
+				_, exist := data.Tags.Contains(msg.GetTags())
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+		// TODO: add filter message hook
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
+			if traFlag {
+				msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
+			}
+			msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
+			msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
+		}
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
diff --git a/consumer/manual_pull_consumer_test.go b/consumer/manual_pull_consumer_test.go
new file mode 100644
index 0000000..d04d522
--- /dev/null
+++ b/consumer/manual_pull_consumer_test.go
@@ -0,0 +1,343 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"strconv"
+	"testing"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/golang/mock/gomock"
+	. "github.com/smartystreets/goconvey/convey"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestPullFromQueue(t *testing.T) {
+	Convey("ManualPullConsumer PullFromQueue", t, func() {
+		serverTopic := "foo"
+		tests := map[string]struct {
+			mq          primitive.MessageQueue
+			offset      int64
+			numbers     int
+			expectedErr bool
+		}{
+			"topic exist": {
+				primitive.MessageQueue{
+					Topic:      "foo",
+					BrokerName: "",
+					QueueId:    0,
+				},
+				1,
+				1,
+				false,
+			},
+			"topic not exist": {
+				primitive.MessageQueue{
+					Topic:      "foo2",
+					BrokerName: "",
+					QueueId:    0,
+				},
+				1,
+				1,
+				false,
+			},
+		}
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+		if err != nil {
+			assert.Error(t, err)
+		}
+
+		namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+		c.namesrv, c.client = namesrv, client
+		namesrv.EXPECT().FindBrokerAddressInSubscribe(gomock.Any(), gomock.Any(), gomock.Any()).Return(
+			&internal.FindBrokerResult{
+				BrokerAddr:    "foo",
+				Slave:         false,
+				BrokerVersion: 1.0,
+			},
+		)
+		client.EXPECT().PullMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+			func(ctx context.Context, brokerAddrs string, request *internal.PullMessageRequestHeader) (*primitive.PullResult, error) {
+				pullResult := &primitive.PullResult{
+					SuggestWhichBrokerId: 0,
+				}
+				if request.Topic == serverTopic {
+					pullResult.Status = primitive.PullFound
+				} else {
+					pullResult.Status = primitive.PullNoNewMsg
+				}
+				return pullResult, nil
+			})
+
+		for name, test := range tests {
+			Convey(name, func() {
+				_, err := c.PullFromQueue(context.Background(), "default", &test.mq, test.offset, test.numbers)
+				So(err, ShouldBeNil)
+			})
+		}
+
+	})
+}
+
+func TestGetMessageQueues(t *testing.T) {
+	Convey("ManualPullConsumer GetMessageQueues", t, func() {
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+		if err != nil {
+			assert.Error(t, err)
+		}
+
+		namesrv := internal.NewMockNamesrvs(ctrl)
+		c.namesrv = namesrv
+		namesrv.EXPECT().FetchSubscribeMessageQueues("foo").Return(
+			[]*primitive.MessageQueue{
+				{Topic: "foo", BrokerName: "foo", QueueId: 0},
+			}, nil,
+		)
+
+		queues, err := c.GetMessageQueues(context.TODO(), "foo")
+
+		So(queues, ShouldNotBeEmpty)
+		So(err, ShouldBeNil)
+	})
+}
+
+func TestCommittedOffset(t *testing.T) {
+	Convey("ManualPullConsumer CommittedOffset", t, func() {
+
+		serverTopic, serverOffset := "foo", "1"
+		tests := map[string]struct {
+			mq          primitive.MessageQueue
+			except      int
+			expectedErr bool
+		}{
+			"topic exist": {
+				primitive.MessageQueue{
+					Topic:      "foo",
+					BrokerName: "",
+					QueueId:    0,
+				},
+				1,
+				false,
+			},
+			"topic not exist": {
+				primitive.MessageQueue{
+					Topic:      "foo2",
+					BrokerName: "",
+					QueueId:    0,
+				},
+				-2,
+				true,
+			},
+		}
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+		if err != nil {
+			assert.Error(t, err)
+		}
+		namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+		c.namesrv, c.client = namesrv, client
+
+		namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+		client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+			func(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+				if request.ExtFields["topic"] == serverTopic {
+					ret := &remote.RemotingCommand{
+						Code: internal.ResSuccess,
+						ExtFields: map[string]string{
+							"offset": serverOffset,
+						},
+					}
+					return ret, nil
+				}
+				ret := &remote.RemotingCommand{
+					Code: internal.ResTopicNotExist,
+				}
+				return ret, nil
+			}).AnyTimes()
+
+		for name, test := range tests {
+			Convey(name, func() {
+				ret, err := c.CommittedOffset(context.Background(), "foo", &test.mq)
+
+				if test.expectedErr {
+					So(err, ShouldNotBeNil)
+				} else {
+					So(err, ShouldBeNil)
+				}
+				So(ret, ShouldEqual, test.except)
+			})
+		}
+	})
+}
+
+func TestSeek(t *testing.T) {
+	Convey("ManualPullConsumer Seek", t, func() {
+
+		serverMinOffset, serverMaxOffset := "1", "10"
+		tests := map[string]struct {
+			mq          primitive.MessageQueue
+			offset      int64
+			expectedErr bool
+		}{
+			"normal offset": {
+				primitive.MessageQueue{
+					Topic:      "foo",
+					BrokerName: "foo",
+					QueueId:    0,
+				},
+				3,
+				false,
+			},
+			"illegal offset": {
+				primitive.MessageQueue{
+					Topic:      "foo",
+					BrokerName: "foo",
+					QueueId:    0,
+				},
+				11,
+				true,
+			},
+		}
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+		if err != nil {
+			assert.Error(t, err)
+		}
+		namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+		c.namesrv, c.client = namesrv, client
+
+		namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+
+		client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+			func(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+				if request.Code == internal.ReqGetMinOffset {
+					return &remote.RemotingCommand{
+						Code: internal.ResSuccess,
+						ExtFields: map[string]string{
+							"offset": serverMinOffset,
+						},
+					}, nil
+				} else if request.Code == internal.ReqGetMaxOffset {
+					return &remote.RemotingCommand{
+						Code: internal.ResSuccess,
+						ExtFields: map[string]string{
+							"offset": serverMaxOffset,
+						},
+					}, nil
+				}
+				return &remote.RemotingCommand{
+					Code: internal.ResError,
+				}, nil
+			}).AnyTimes()
+
+		client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+		for name, test := range tests {
+			Convey(name, func() {
+				err := c.Seek(context.Background(), "foo", &test.mq, test.offset)
+				if test.expectedErr {
+					So(err, ShouldNotBeNil)
+				} else {
+					So(err, ShouldBeNil)
+				}
+			})
+		}
+	})
+}
+
+func TestLookup(t *testing.T) {
+	Convey("ManualPullConsumer Lookup", t, func() {
+		test := struct {
+			mq          primitive.MessageQueue
+			timestamp   int64
+			offset      int64
+			expectedErr bool
+		}{
+			primitive.MessageQueue{
+				Topic:      "foo",
+				BrokerName: "foo",
+				QueueId:    0,
+			},
+			int64(time.Now().Nanosecond()),
+			10,
+			true,
+		}
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+		if err != nil {
+			assert.Error(t, err)
+		}
+		namesrv, client := internal.NewMockNamesrvs(ctrl), internal.NewMockRMQClient(ctrl)
+		c.namesrv, c.client = namesrv, client
+
+		namesrv.EXPECT().FindBrokerAddrByName(gomock.Any()).Return("foo").AnyTimes()
+
+		client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+			func(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+
+				return &remote.RemotingCommand{
+					Code: internal.ResSuccess,
+					ExtFields: map[string]string{
+						"offset": strconv.FormatInt(test.offset, 10),
+					},
+				}, nil
+			}).AnyTimes()
+
+		ret, err := c.Lookup(context.Background(), &test.mq, test.timestamp)
+		So(err, ShouldBeNil)
+		So(ret, ShouldEqual, test.offset)
+	})
+}
+
+func TestShutdown(t *testing.T) {
+	Convey("ManualPullConsumer Shutdown", t, func() {
+
+		ctrl := gomock.NewController(t)
+		defer ctrl.Finish()
+
+		c, err := NewManualPullConsumer(WithNameServer(primitive.NamesrvAddr{"127.0.0.1"}))
+		if err != nil {
+			assert.Error(t, err)
+		}
+		client := internal.NewMockRMQClient(ctrl)
+		c.client = client
+
+		client.EXPECT().Shutdown().Times(1)
+		c.Shutdown()
+		c.Shutdown()
+	})
+}
diff --git a/examples/consumer/manual/main.go b/examples/consumer/manual/main.go
new file mode 100644
index 0000000..1bdf479
--- /dev/null
+++ b/examples/consumer/manual/main.go
@@ -0,0 +1,83 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		rlog.Fatal(fmt.Sprintf("init producer error: %v", err), nil)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		rlog.Fatal(fmt.Sprintf("get message queue error: %v", err), nil)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		if err != nil {
+			rlog.Fatal(fmt.Sprintf("search consumer offset error: %v", err), nil)
+		}
+		for {
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				rlog.Fatal(fmt.Sprintf("pullFromQueue error: %v", err), nil)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					fmt.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {
+						rlog.Fatal(fmt.Sprintf("commit offset error: %v", err), nil)
+					}
+					offset++
+				}
+			} else {
+				break
+			}
+		}
+	}
+
+	for _, mq := range mqs {
+		wg.Add(1)
+		go fn(mq)
+	}
+	wg.Wait()
+	c.Shutdown()
+}
diff --git a/internal/request.go b/internal/request.go
index 0e3d8e1..f53d5ff 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -236,6 +236,18 @@ func (request *GetConsumerListRequestHeader) Encode() map[string]string {
 	return maps
 }
 
+type GetMinOffsetRequestHeader struct {
+	Topic   string
+	QueueId int
+}
+
+func (request *GetMinOffsetRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	maps["queueId"] = strconv.Itoa(request.QueueId)
+	return maps
+}
+
 type GetMaxOffsetRequestHeader struct {
 	Topic   string
 	QueueId int
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
index 878aab5..51716aa 100644
--- a/primitive/interceptor.go
+++ b/primitive/interceptor.go
@@ -29,6 +29,10 @@ type Invoker func(ctx context.Context, req, reply interface{}) error
 // use type assert to get real type.
 type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error
 
+var NoopInterceptor = func(ctx context.Context, req, reply interface{}) error {
+	return nil
+}
+
 func ChainInterceptors(interceptors ...Interceptor) Interceptor {
 	if len(interceptors) == 0 {
 		return nil