You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/11/29 02:03:31 UTC

[GitHub] [rocketmq-client-go] wangweizZZ opened a new pull request #745: add manual consumer

wangweizZZ opened a new pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745


   ## What is the purpose of the change
   
   #737
   
   ## Brief changelog
   
   add ManualPullConsumer
   > `internal.RMQClient` is still used, and you can see if it is refactored later in this part.
   
   ## Verifying this change
   
   XXXX
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when a cross-module dependency exists.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wangweizZZ commented on pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wangweizZZ commented on pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#issuecomment-995801405


   > I have another question: the `manual_pull_consumer.go` have many codes similar to `consumer.go`, could you tell me somewhat your consideration about this?
   
   There are two ways to answer this question. 
   1. At present, `consumer.go` contains a lot of state maintenance, including offset, etc. It is a complete consumer, so `ManualPullConsumer.go` cannot reuse it directly. 
   2. As I said earlier, `consumer.go` is a complete consumer, at a higher level, so he can reuse `ManualPullConsumer.go`. At the same time, PullConsumer and PushConsumer should be able to reuse `consumer.go`.
   
   The above personal thoughts, I don’t know if I answered the question


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wangweizZZ commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wangweizZZ commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r769737600



##########
File path: primitive/interceptor.go
##########
@@ -29,6 +29,10 @@ type Invoker func(ctx context.Context, req, reply interface{}) error
 // use type assert to get real type.
 type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error
 
+var NoopInterceptor = func(ctx context.Context, req, reply interface{}) error {

Review comment:
       In my use, I want to trigger the interceptor as AfterPullMessageHook. [link code](https://github.com/apache/rocketmq-client-go/pull/745/commits/6e2b133181a6763d6f0aef746187d88aab9ae7e4#diff-3e4be2af9a7961a81e0a80bf3e76646ef3f82fc73ed04f636960e990f4d9260fR116)
   But I don't need to do some operations after the interceptor ends.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r775351153



##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {

Review comment:
       confirmation before release




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r774704998



##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {
+						log.Fatalf("commit offset error: %v", err)
+					}
+					offset++
+				}
+			} else {
+				break
+			}
+		}
+	}
+
+	for _, mq := range mqs {
+		wg.Add(1)
+		go fn(mq)
+	}
+	wg.Wait()
+	c.Shutdown()
+	os.Exit(0)

Review comment:
       redundant code line, because exit code is 0 if normal exiting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r769287830



##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {

Review comment:
       Could you add some comments for this?

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {

Review comment:
       Could modify the method signature to pass-in context? 

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)

Review comment:
       The same error has been returned different place, could you use an error variable instead of it?

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMinOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMaxOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData,
+	offset int64, numbers int, sysFlag int32, commitOffsetValue int64) (*primitive.PullResult, error) {
+	brokerResult := dc.tryFindBroker(queue)
+	if brokerResult == nil {
+		rlog.Warning("no broker found for mq", map[string]interface{}{
+			rlog.LogKeyMessageQueue: queue,
+		})
+		return nil, errors2.ErrBrokerNotFound
+	}
+	if brokerResult.Slave {
+		sysFlag = clearCommitOffsetFlag(sysFlag)
+	}
+
+	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0 {
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+			queue.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	}
+
+	pullRequest := &internal.PullMessageRequestHeader{
+		ConsumerGroup:        dc.group,
+		Topic:                queue.Topic,
+		QueueId:              int32(queue.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         commitOffsetValue,
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        data.SubString,
+		ExpressionType:       string(data.ExpType),
+	}
+	if data.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = data.SubVersion
+	}
+	// TODO: add computPullFromWhichFilterServer
+
+	return dc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultManualPullConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
+	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+	if result != nil {
+		return result
+	}
+	dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+}
+func (dc *defaultManualPullConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+	v, exist := dc.pullFromWhichNodeTable.Load(*mq)
+	if exist {
+		return v.(int64)
+	}
+	return internal.MasterId
+}
+
+func (dc *defaultManualPullConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+	if mq == nil {
+		return errors2.ErrMQEmpty
+	}
+	if offset < 0 {
+		return errors2.ErrOffset
+	}
+	if numbers <= 0 {
+		return errors2.ErrNumbers
+	}
+	return nil
+}
+
+func (dc *defaultManualPullConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
+
+	dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+
+	switch result.Status {
+	case primitive.PullFound:
+		result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+		msgs := result.GetMessageExts()
+		// filter message according to tags
+		msgListFilterAgain := msgs
+		if data.Tags.Len() > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*primitive.MessageExt, 0)
+			for _, msg := range msgs {
+				_, exist := data.Tags.Contains(msg.GetTags())
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+		// TODO: add filter message hook
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
+			if traFlag {
+				msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
+			}
+			msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
+			msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
+		}
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
+
+func (dc *defaultManualPullConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {

Review comment:
       Could delete this method?

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMinOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMaxOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData,

Review comment:
       Too many args in the method signature, could use a `PullMessageRequestHeader` instead of these args?

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset

Review comment:
       Could you start with this comment like `// PullFromQueue xxx` to follow Go specification? and below are the same.

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{

Review comment:
       Could you modify the method signature in order to return an error instead of printing the error log?

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {

Review comment:
       Could modify the method signature to pass-in context?
   
   

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMinOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMaxOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)

Review comment:
       same as the previous comment of like this line

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err

Review comment:
       please wrap the error, talk to the caller about what caused this error.

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMinOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)

Review comment:
       same as above comment

##########
File path: primitive/interceptor.go
##########
@@ -29,6 +29,10 @@ type Invoker func(ctx context.Context, req, reply interface{}) error
 // use type assert to get real type.
 type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error
 
+var NoopInterceptor = func(ctx context.Context, req, reply interface{}) error {

Review comment:
       Why add this method?

##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)

Review comment:
       Don't return directly, separating this and handle error may occur




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r770258524



##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMinOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMaxOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData,
+	offset int64, numbers int, sysFlag int32, commitOffsetValue int64) (*primitive.PullResult, error) {
+	brokerResult := dc.tryFindBroker(queue)
+	if brokerResult == nil {
+		rlog.Warning("no broker found for mq", map[string]interface{}{
+			rlog.LogKeyMessageQueue: queue,
+		})
+		return nil, errors2.ErrBrokerNotFound
+	}
+	if brokerResult.Slave {
+		sysFlag = clearCommitOffsetFlag(sysFlag)
+	}
+
+	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0 {
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+			queue.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	}
+
+	pullRequest := &internal.PullMessageRequestHeader{
+		ConsumerGroup:        dc.group,
+		Topic:                queue.Topic,
+		QueueId:              int32(queue.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         commitOffsetValue,
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        data.SubString,
+		ExpressionType:       string(data.ExpType),
+	}
+	if data.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = data.SubVersion
+	}
+	// TODO: add computPullFromWhichFilterServer
+
+	return dc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultManualPullConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
+	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+	if result != nil {
+		return result
+	}
+	dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+}
+func (dc *defaultManualPullConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+	v, exist := dc.pullFromWhichNodeTable.Load(*mq)
+	if exist {
+		return v.(int64)
+	}
+	return internal.MasterId
+}
+
+func (dc *defaultManualPullConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+	if mq == nil {
+		return errors2.ErrMQEmpty
+	}
+	if offset < 0 {
+		return errors2.ErrOffset
+	}
+	if numbers <= 0 {
+		return errors2.ErrNumbers
+	}
+	return nil
+}
+
+func (dc *defaultManualPullConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
+
+	dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+
+	switch result.Status {
+	case primitive.PullFound:
+		result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+		msgs := result.GetMessageExts()
+		// filter message according to tags
+		msgListFilterAgain := msgs
+		if data.Tags.Len() > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*primitive.MessageExt, 0)
+			for _, msg := range msgs {
+				_, exist := data.Tags.Contains(msg.GetTags())
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+		// TODO: add filter message hook
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
+			if traFlag {
+				msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
+			}
+			msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
+			msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
+		}
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
+
+func (dc *defaultManualPullConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {

Review comment:
       Yes, merge this part close to the calling if you don't other consideration, because the method has only one line and there is just one place to call this method. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r774712914



##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {

Review comment:
       Well, I don't know how we talked about last month, as @vongosling mentioned in https://github.com/apache/rocketmq-client-go/issues/737, we should compatible same semantics with Java 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r774716593



##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {

Review comment:
       Well, I just remember what we last month talked about. as @vongosling mentioned in #737, We should stay compatible with Java Interface semantics. So, before I get a confirmation, you can ignore the conversation and pay attention to other conversion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang merged pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang merged pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r774716593



##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {

Review comment:
       Well, I just remember what we last month talked about. as @vongosling mentioned in #737, We should stay compatible with Java Interface semantics. So, before I get a confirmation, you can ignore this conversation and pay attention to other conversion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r774703591



##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,342 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+// ManualPullConsumer is a low-level consumer, which operates based on MessageQueue.
+// Users should maintain information such as offset by themselves
+type ManualPullConsumer interface {
+	// PullFromQueue return messages according to specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// GetMessageQueues return queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error)
+
+	// CommittedOffset return the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(ctx context.Context, groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// Seek let consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// Lookup return offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return.
+	// If timestamp less than any message's born time, the earliest offset will be returned
+	// If timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+
+	// Shutdown the ManualPullConsumer, clean up internal resources
+	Shutdown() error
+}
+
+type defaultManualPullConsumer struct {
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+	shutdownOnce           sync.Once
+}
+
+// NewManualPullConsumer creates and initializes a new ManualPullConsumer.
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, groupName string, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullRequest := &internal.PullMessageRequestHeader{
+		ConsumerGroup:        groupName,
+		Topic:                mq.Topic,
+		QueueId:              int32(mq.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         0,
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        subData.SubString,
+		ExpressionType:       string(subData.ExpType),
+	}
+
+	if subData.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = subData.SubVersion
+	}
+
+	pullResp, err := dc.pullInner(ctx, mq, pullRequest)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: groupName,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) ([]*primitive.MessageQueue, error) {
+	return dc.namesrv.FetchSubscribeMessageQueues(topic)
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(ctx context.Context, groupName string, mq *primitive.MessageQueue) (int64, error) {
+	fn := func(broker string) (*remote.RemotingCommand, error) {
+		request := &internal.QueryConsumerOffsetRequestHeader{
+			ConsumerGroup: groupName,
+			Topic:         mq.Topic,
+			QueueId:       mq.QueueId,
+		}
+		cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)

Review comment:
       Should use `ReqQueryConsumerOffset`

##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {

Review comment:
       Using `Seek` to commit offset seems unclear because the `Seek` method is commonly used express reset a read/write position semantics. Could we rename this method to `CommitOffset` and rename `CommittedOffset ` to `QueryCommitedOffset` for reduce ambiguous? This is not fully considered thinking, what's your think about it?

##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)
+	}
+	var wg sync.WaitGroup
+
+	fn := func(mq *primitive.MessageQueue) {
+		defer wg.Done()
+		// get latest offset
+		offset, err := c.CommittedOffset(context.Background(), groupName, mq)
+		for {
+			if err != nil {
+				log.Fatalf("search latest offset error: %v", err)
+			}
+			// pull message
+			ret, err := c.PullFromQueue(context.Background(), groupName, mq, offset, 1)
+			if err != nil {
+				log.Fatalf("pullFromQueue error: %v", err)
+			}
+			if ret.Status == primitive.PullFound {
+				msgs := ret.GetMessageExts()
+				for _, msg := range msgs {
+					log.Printf("subscribe Msg: %v \n", msg)
+					// commit offset
+					if err = c.Seek(context.Background(), groupName, mq, msg.QueueOffset+1); err != nil {
+						log.Fatalf("commit offset error: %v", err)
+					}
+					offset++
+				}
+			} else {
+				break
+			}
+		}
+	}
+
+	for _, mq := range mqs {
+		wg.Add(1)
+		go fn(mq)
+	}
+	wg.Wait()
+	c.Shutdown()
+	os.Exit(0)

Review comment:
       redundant code line, because if exit code is 0 if normal exiting

##########
File path: examples/consumer/manual/main.go
##########
@@ -0,0 +1,84 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"log"
+	"os"
+	"sync"
+
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	groupName := "testGroup"
+	c, err := consumer.NewManualPullConsumer(
+		consumer.WithGroupName(groupName),
+		consumer.WithNameServer(primitive.NamesrvAddr{"127.0.0.1:9876"}),
+	)
+	if err != nil {
+		log.Fatalf("init producer error: %v", err)
+	}
+
+	topic := "test"
+	// get all message queue
+	mqs, err := c.GetMessageQueues(context.Background(), topic)
+	if err != nil {
+		log.Fatalf("Get message queue error: %v", err)

Review comment:
       please use `rlog` or `fmt.Printf` instead of it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wangweizZZ commented on pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wangweizZZ commented on pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#issuecomment-993081330


   @wenfengwang  please help review this one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wenfengwang commented on pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wenfengwang commented on pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#issuecomment-995622373


   I have another question: the `manual_pull_consumer.go` have many codes similar to `consumer.go`, could you tell me somewhat your consideration about this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-client-go] wangweizZZ commented on a change in pull request #745: [ISSUE #737] Add ManualPullConsumer

Posted by GitBox <gi...@apache.org>.
wangweizZZ commented on a change in pull request #745:
URL: https://github.com/apache/rocketmq-client-go/pull/745#discussion_r769742481



##########
File path: consumer/manual_pull_consumer.go
##########
@@ -0,0 +1,359 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"fmt"
+	"strconv"
+	"sync"
+	"time"
+
+	errors2 "github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+	"github.com/pkg/errors"
+)
+
+type ManualPullConsumer interface {
+	// get n messages from specified queue with offset
+	PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error)
+
+	// get queues of the topic
+	GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue
+
+	// get the offset of mq in groupName, if mq not exist, -1 will be return
+	CommittedOffset(groupName string, mq *primitive.MessageQueue) (int64, error)
+
+	// seek consume position to the offset, this api can be used to reset offset and commit offset
+	Seek(groupName string, mq *primitive.MessageQueue, offset int64) error
+
+	// query offset according to timestamp(ms), the maximum offset that born time less than timestamp will be return
+	// if timestamp less than any message's born time, the earliest offset will be returned
+	// if timestamp great than any message's born time, the latest offset will be returned
+	Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error)
+}
+
+type defaultManualPullConsumer struct {
+	group                  string
+	namesrv                internal.Namesrvs
+	option                 consumerOptions
+	client                 internal.RMQClient
+	interceptor            primitive.Interceptor
+	pullFromWhichNodeTable sync.Map
+}
+
+func NewManualPullConsumer(options ...Option) (*defaultManualPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	if !defaultOpts.Credentials.IsEmpty() {
+		srvs.SetCredentials(defaultOpts.Credentials)
+	}
+	defaultOpts.Namesrv = srvs
+
+	if defaultOpts.Namespace != "" {
+		defaultOpts.GroupName = defaultOpts.Namespace + "%" + defaultOpts.GroupName
+	}
+
+	actualRMQClient := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	actualNameSrv := internal.GetOrSetNamesrv(actualRMQClient.ClientID(), defaultOpts.Namesrv)
+
+	dc := &defaultManualPullConsumer{
+		client:  actualRMQClient,
+		option:  defaultOpts,
+		namesrv: actualNameSrv,
+		group:   defaultOpts.GroupName,
+	}
+
+	dc.interceptor = primitive.ChainInterceptors(dc.option.Interceptors...)
+	dc.option.ClientOptions.Namesrv = actualNameSrv
+	return dc, nil
+}
+
+func (dc *defaultManualPullConsumer) PullFromQueue(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := dc.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
+	}
+	subData := buildSubscriptionData(mq.Topic, MessageSelector{
+		Expression: _SubAll,
+	})
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	pullResp, err := dc.pullInner(ctx, mq, subData, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return pullResp, err
+	}
+	dc.processPullResult(mq, pullResp, subData)
+	if dc.interceptor != nil {
+		msgCtx := &primitive.ConsumeMessageContext{
+			Properties:    make(map[string]string),
+			ConsumerGroup: dc.group,
+			MQ:            mq,
+			Msgs:          pullResp.GetMessageExts(),
+		}
+		err = dc.interceptor(ctx, msgCtx, struct{}{}, primitive.NoopInterceptor)
+	}
+	return pullResp, err
+}
+
+func (dc *defaultManualPullConsumer) GetMessageQueues(ctx context.Context, topic string) []*primitive.MessageQueue {
+	queues, err := dc.namesrv.FetchSubscribeMessageQueues(topic)
+	if err != nil {
+		rlog.Error("get message queue error", map[string]interface{}{
+			rlog.LogKeyTopic:         topic,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
+		return nil
+	}
+	return queues
+}
+
+func (dc *defaultManualPullConsumer) CommittedOffset(group string, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &internal.QueryConsumerOffsetRequestHeader{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != internal.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+	if err != nil {
+		return -1, err
+	}
+	return off, nil
+}
+
+func (dc *defaultManualPullConsumer) Seek(groupName string, mq *primitive.MessageQueue, offset int64) error {
+	minOffset, err := dc.queryMinOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	maxOffset, err := dc.queryMaxOffset(context.Background(), mq)
+	if err != nil {
+		return err
+	}
+	if offset < minOffset || offset > maxOffset {
+		return fmt.Errorf("Seek offset illegal, seek offset = %d, min offset = %d, max offset = %d", offset, minOffset, maxOffset)
+	}
+
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+
+	updateOffsetRequest := &internal.UpdateConsumerOffsetRequestHeader{
+		ConsumerGroup: groupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		CommitOffset:  offset,
+	}
+	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return dc.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
+}
+
+func (dc *defaultManualPullConsumer) Lookup(ctx context.Context, mq *primitive.MessageQueue, timestamp int64) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.SearchOffsetRequestHeader{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
+	response, err := dc.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) chooseServer(mq *primitive.MessageQueue) (string, bool) {
+	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
+	}
+	return brokerAddr, brokerAddr != ""
+}
+
+func (dc *defaultManualPullConsumer) queryMinOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMinOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMinOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) queryMaxOffset(ctx context.Context, mq *primitive.MessageQueue) (int64, error) {
+	broker, exist := dc.chooseServer(mq)
+	if !exist {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &internal.GetMaxOffsetRequestHeader{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
+	response, err := dc.client.InvokeSync(ctx, broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
+}
+
+func (dc *defaultManualPullConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue, data *internal.SubscriptionData,
+	offset int64, numbers int, sysFlag int32, commitOffsetValue int64) (*primitive.PullResult, error) {
+	brokerResult := dc.tryFindBroker(queue)
+	if brokerResult == nil {
+		rlog.Warning("no broker found for mq", map[string]interface{}{
+			rlog.LogKeyMessageQueue: queue,
+		})
+		return nil, errors2.ErrBrokerNotFound
+	}
+	if brokerResult.Slave {
+		sysFlag = clearCommitOffsetFlag(sysFlag)
+	}
+
+	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0 {
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+			queue.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	}
+
+	pullRequest := &internal.PullMessageRequestHeader{
+		ConsumerGroup:        dc.group,
+		Topic:                queue.Topic,
+		QueueId:              int32(queue.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         commitOffsetValue,
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        data.SubString,
+		ExpressionType:       string(data.ExpType),
+	}
+	if data.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = data.SubVersion
+	}
+	// TODO: add computPullFromWhichFilterServer
+
+	return dc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultManualPullConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
+	result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+	if result != nil {
+		return result
+	}
+	dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
+	return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
+}
+func (dc *defaultManualPullConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+	v, exist := dc.pullFromWhichNodeTable.Load(*mq)
+	if exist {
+		return v.(int64)
+	}
+	return internal.MasterId
+}
+
+func (dc *defaultManualPullConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset int64, numbers int) error {
+	if mq == nil {
+		return errors2.ErrMQEmpty
+	}
+	if offset < 0 {
+		return errors2.ErrOffset
+	}
+	if numbers <= 0 {
+		return errors2.ErrNumbers
+	}
+	return nil
+}
+
+func (dc *defaultManualPullConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
+
+	dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+
+	switch result.Status {
+	case primitive.PullFound:
+		result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+		msgs := result.GetMessageExts()
+		// filter message according to tags
+		msgListFilterAgain := msgs
+		if data.Tags.Len() > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*primitive.MessageExt, 0)
+			for _, msg := range msgs {
+				_, exist := data.Tags.Contains(msg.GetTags())
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+		// TODO: add filter message hook
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.GetProperty(primitive.PropertyTransactionPrepared))
+			if traFlag {
+				msg.TransactionId = msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex)
+			}
+			msg.WithProperty(primitive.PropertyMinOffset, strconv.FormatInt(result.MinOffset, 10))
+			msg.WithProperty(primitive.PropertyMaxOffset, strconv.FormatInt(result.MaxOffset, 10))
+		}
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
+
+func (dc *defaultManualPullConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {

Review comment:
       Do you mean to just merge this part of the code close to the calling place(ManualPullConsumer#processPullResult), or  not need the logic of recalculatePullFromWhichNode at all?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org