You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2021/07/30 08:58:19 UTC

[rocketmq-client-go] branch master updated: feat(internal): support reset consumer offset (#682)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 46701f1  feat(internal): support reset consumer offset (#682)
46701f1 is described below

commit 46701f1d6e4b3fbbbd1e702a0cae9ae947b91da3
Author: 张旭 <ma...@gmail.com>
AuthorDate: Fri Jul 30 16:49:40 2021 +0800

    feat(internal): support reset consumer offset (#682)
    
    Co-authored-by: zhangxu16 <zh...@xiaomi.com>
---
 consumer/process_queue.go |  1 +
 consumer/push_consumer.go | 15 +++++++++------
 internal/client.go        | 27 +++++++++++++++++++++++++++
 internal/model.go         | 47 +++++++++++++++++++++++++++++++++++++++++++++++
 internal/model_test.go    | 16 ++++++++++++++++
 internal/request.go       | 35 ++++++++++++++++++++++++++++++++++-
 rlog/log.go               |  1 +
 7 files changed, 135 insertions(+), 7 deletions(-)

diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 0e9d8ec..76a9236 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -343,6 +343,7 @@ func (pq *processQueue) MaxOrderlyCache() int64 {
 
 func (pq *processQueue) clear() {
 	pq.mutex.Lock()
+	defer pq.mutex.Unlock()
 	pq.msgCache.Clear()
 	pq.cachedMsgCount = 0
 	pq.cachedMsgSize = 0
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index ec44a1e..c84ce84 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -831,7 +831,7 @@ func (pc *pushConsumer) resume() {
 	rlog.Info(fmt.Sprintf("resume consumer: %s", pc.consumerGroup), nil)
 }
 
-func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQueue]int64) {
+func (pc *pushConsumer) ResetOffset(topic string, table map[primitive.MessageQueue]int64) {
 	//topic := cmd.ExtFields["topic"]
 	//group := cmd.ExtFields["group"]
 	//if topic == "" || group == "" {
@@ -857,11 +857,13 @@ func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQue
 	//	rlog.Infof("[reset-offset] consumer dose not exist. group=%s", group)
 	//	return
 	//}
+	pc.suspend()
+	defer pc.resume()
 
 	pc.processQueueTable.Range(func(key, value interface{}) bool {
 		mq := key.(primitive.MessageQueue)
 		pq := value.(*processQueue)
-		if _, ok := table[mq]; !ok {
+		if _, ok := table[mq]; ok && mq.Topic == topic {
 			pq.WithDropped(true)
 			pq.clear()
 		}
@@ -872,16 +874,17 @@ func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQue
 	if !exist {
 		return
 	}
-	queuesOfTopic := v.([]primitive.MessageQueue)
+	queuesOfTopic := v.([]*primitive.MessageQueue)
 	for _, k := range queuesOfTopic {
-		if _, ok := table[k]; ok {
-			pc.storage.update(&k, table[k], false)
+		if _, ok := table[*k]; ok {
+			pc.storage.update(k, table[*k], false)
 			v, exist := pc.processQueueTable.Load(k)
 			if !exist {
 				continue
 			}
 			pq := v.(*processQueue)
-			pc.removeUnnecessaryMessageQueue(&k, pq)
+			pc.removeUnnecessaryMessageQueue(k, pq)
+			pc.processQueueTable.Delete(k)
 		}
 	}
 }
diff --git a/internal/client.go b/internal/client.go
index 6e665ea..3a09ea8 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -88,6 +88,7 @@ type InnerConsumer interface {
 	GetcType() string
 	GetModel() string
 	GetWhere() string
+	ResetOffset(topic string, table map[primitive.MessageQueue]int64)
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -283,6 +284,23 @@ func GetOrNewRocketMQClient(option ClientOptions, callbackCh chan interface{}) R
 			}
 			return res
 		})
+
+		client.remoteClient.RegisterRequestFunc(ReqResetConsumerOffset, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
+			rlog.Info("receive reset consumer offset request...", map[string]interface{}{
+				rlog.LogKeyBroker:        addr.String(),
+				rlog.LogKeyTopic:         req.ExtFields["topic"],
+				rlog.LogKeyConsumerGroup: req.ExtFields["group"],
+				rlog.LogKeyTimeStamp:     req.ExtFields["timestamp"],
+			})
+			header := new(ResetOffsetHeader)
+			header.Decode(req.ExtFields)
+
+			body := new(ResetOffsetBody)
+			body.Decode(req.Body)
+
+			client.resetOffset(header.topic, header.group, body.OffsetTable)
+			return nil
+		})
 	}
 	return actual.(*rmqClient)
 }
@@ -777,6 +795,15 @@ func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
 	return result
 }
 
+func (c *rmqClient) resetOffset(topic string, group string, offsetTable map[primitive.MessageQueue]int64) {
+	consumer, exist := c.consumerMap.Load(group)
+	if !exist {
+		rlog.Warning("group "+group+" do not exists", nil)
+		return
+	}
+	consumer.(InnerConsumer).ResetOffset(topic, offsetTable)
+}
+
 func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
 	consumer, exist := c.consumerMap.Load(group)
 	if !exist {
diff --git a/internal/model.go b/internal/model.go
index 934c610..d7f2057 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -21,7 +21,9 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
+	"github.com/tidwall/gjson"
 	"sort"
+	"strconv"
 	"strings"
 
 	"github.com/apache/rocketmq-client-go/v2/internal/utils"
@@ -288,3 +290,48 @@ func (result ConsumeMessageDirectlyResult) Encode() ([]byte, error) {
 	}
 	return data, nil
 }
+
+type ResetOffsetBody struct {
+	OffsetTable map[primitive.MessageQueue]int64 `json:"offsetTable"`
+}
+
+func (resetOffsetBody *ResetOffsetBody) Decode(body []byte) {
+	result := gjson.ParseBytes(body)
+	rlog.Debug("offset table string "+result.Get("offsetTable").String(), nil)
+
+	offsetTable := make(map[primitive.MessageQueue]int64, 0)
+	offsetTableArray := strings.Split(result.Get("offsetTable").String(), "],[")
+	for index, v := range offsetTableArray {
+		kvArray := strings.Split(v, "},")
+
+		var kstr, vstr string
+		if index == len(offsetTableArray)-1 {
+			vstr = kvArray[1][:len(kvArray[1])-2]
+		} else {
+			vstr = kvArray[1]
+		}
+		offset, err := strconv.ParseInt(vstr, 10, 64)
+		if err != nil {
+			rlog.Error("Unmarshal offset error", map[string]interface{}{
+				rlog.LogKeyUnderlayError: err,
+			})
+			return
+		}
+
+		if index == 0 {
+			kstr = kvArray[0][2:len(kvArray[0])] + "}"
+		} else {
+			kstr = kvArray[0] + "}"
+		}
+		kObj := new(primitive.MessageQueue)
+		err = jsoniter.Unmarshal([]byte(kstr), &kObj)
+		if err != nil {
+			rlog.Error("Unmarshal message queue error", map[string]interface{}{
+				rlog.LogKeyUnderlayError: err,
+			})
+			return
+		}
+		offsetTable[*kObj] = offset
+	}
+	resetOffsetBody.OffsetTable = offsetTable
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 57ff0af..56eeb24 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -402,5 +402,21 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
 			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
 		})
 	})
+}
 
+func TestRestOffsetBody_MarshalJSON(t *testing.T) {
+	Convey("test ResetOffset Body Decode", t, func() {
+		body := "{\"offsetTable\":[[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":5},23354233],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":4},23354245],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":7},23354203],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"queueId\":6},23354312],[{\"topic\":\"zx_tst\",\"brokerName\":\"tjwqtst-common-rocketmq-raft0\",\"q [...]
+		resetOffsetBody := new(ResetOffsetBody)
+		resetOffsetBody.Decode([]byte(body))
+		offsetTable := resetOffsetBody.OffsetTable
+		So(offsetTable, ShouldNotBeNil)
+		So(len(offsetTable), ShouldEqual, 8)
+		messageQueue := primitive.MessageQueue{
+			Topic:      "zx_tst",
+			BrokerName: "tjwqtst-common-rocketmq-raft0",
+			QueueId:    5,
+		}
+		So(offsetTable[messageQueue], ShouldEqual, 23354233)
+	})
 }
diff --git a/internal/request.go b/internal/request.go
index 237d711..0e3d8e1 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -48,7 +48,7 @@ const (
 	ReqGetAllTopicListFromNameServer = int16(206)
 	ReqDeleteTopicInBroker           = int16(215)
 	ReqDeleteTopicInNameSrv          = int16(216)
-	ReqResetConsuemrOffset           = int16(220)
+	ReqResetConsumerOffset           = int16(220)
 	ReqGetConsumerRunningInfo        = int16(307)
 	ReqConsumeMessageDirectly        = int16(309)
 )
@@ -408,6 +408,39 @@ func (request *DeleteTopicRequestHeader) Encode() map[string]string {
 	return maps
 }
 
+type ResetOffsetHeader struct {
+	topic     string
+	group     string
+	timestamp int64
+	isForce   bool
+}
+
+func (request *ResetOffsetHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.topic
+	maps["group"] = request.group
+	maps["timestamp"] = strconv.FormatInt(request.timestamp, 10)
+	return maps
+}
+
+func (request *ResetOffsetHeader) Decode(properties map[string]string) {
+	if len(properties) == 0 {
+		return
+	}
+
+	if v, existed := properties["topic"]; existed {
+		request.topic = v
+	}
+
+	if v, existed := properties["group"]; existed {
+		request.group = v
+	}
+
+	if v, existed := properties["timestamp"]; existed {
+		request.timestamp, _ = strconv.ParseInt(v, 10, 0)
+	}
+}
+
 type ConsumeMessageDirectlyHeader struct {
 	consumerGroup string
 	clientID      string
diff --git a/rlog/log.go b/rlog/log.go
index 1d850c3..382f5aa 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -33,6 +33,7 @@ const (
 	LogKeyValueChangedFrom = "changedFrom"
 	LogKeyValueChangedTo   = "changeTo"
 	LogKeyPullRequest      = "PullRequest"
+	LogKeyTimeStamp        = "timestamp"
 )
 
 type Logger interface {