You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/08/09 06:59:59 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1427]Go SDK return maxOffset and updateTime in ConsumerOffset (#1428)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 968eae0  [INLONG-1427]Go SDK return maxOffset and updateTime in ConsumerOffset (#1428)
968eae0 is described below

commit 968eae037dbde0252d75db4296fe0c2c89615b00
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon Aug 9 14:59:55 2021 +0800

    [INLONG-1427]Go SDK return maxOffset and updateTime in ConsumerOffset (#1428)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go            | 12 +++----
 .../tubemq-client-go/client/consumer_impl.go       | 24 +++++++------
 .../tubemq-client-go/remote/remote.go              | 39 ++++++++++++++++++----
 3 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index bea0223..b805c30 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,6 +19,10 @@
 // which can be exposed to user.
 package client
 
+import (
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+)
+
 // ConsumerResult of a consumption.
 type ConsumerResult struct {
 	TopicName      string
@@ -27,12 +31,6 @@ type ConsumerResult struct {
 	Messages       []*Message
 }
 
-// ConsumerOffset of a consumption.
-type ConsumerOffset struct {
-	PartitionKey string
-	CurrOffset   int64
-}
-
 var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
@@ -42,7 +40,7 @@ type Consumer interface {
 	// Confirm the consumption of a message.
 	Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
 	// GetCurrConsumedInfo returns the consumptions of the consumer.
-	GetCurrConsumedInfo() map[string]*ConsumerOffset
+	GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
 	// Close closes the consumer client and release the resources.
 	Close() error
 	// GetClientID returns the clientID of the consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index a4bc2f2..e42319b 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -286,7 +286,7 @@ func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResul
 		return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 	}
 	currOffset := rsp.GetCurrOffset()
-	c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset)
+	c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset, util.InvalidValue)
 	err = c.rmtDataCache.ReleasePartition(true, c.subInfo.IsFiltered(topic), confirmContext, consumed)
 	return cs, err
 }
@@ -324,15 +324,17 @@ func parsePartitionKeyToTopic(partitionKey string) (string, error) {
 }
 
 // GetCurrConsumedInfo implementation of TubeMQ consumer.
-func (c *consumer) GetCurrConsumedInfo() map[string]*ConsumerOffset {
+func (c *consumer) GetCurrConsumedInfo() map[string]*remote.ConsumerOffset {
 	partitionOffset := c.rmtDataCache.GetCurPartitionOffset()
-	consumedInfo := make(map[string]*ConsumerOffset, len(partitionOffset))
-	for partition, offset := range partitionOffset {
-		co := &ConsumerOffset{
-			PartitionKey: partition,
-			CurrOffset:   offset,
+	consumedInfo := make(map[string]*remote.ConsumerOffset, len(partitionOffset))
+	for partitionKey, offset := range partitionOffset {
+		co := &remote.ConsumerOffset{
+			PartitionKey: partitionKey,
+			CurrOffset:   offset.CurrOffset,
+			MaxOffset:    offset.MaxOffset,
+			UpdateTime:   offset.UpdateTime,
 		}
-		consumedInfo[partition] = co
+		consumedInfo[partitionKey] = co
 	}
 	return consumedInfo
 }
@@ -581,7 +583,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
 			maxOffset = rsp.GetMaxOffset()
 		}
 		msgSize, msgs := c.convertMessages(filtered, partition.GetTopic(), rsp)
-		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), currOffset)
+		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), currOffset, maxOffset)
 		cd := metadata.NewConsumeData(now, 200, escLimit, int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
 		c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
 		pi.CurrOffset = currOffset
@@ -601,7 +603,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
 			defDltTime = c.config.Consumer.MsgNotFoundWait.Milliseconds()
 		}
 		cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, defDltTime, rsp.GetRequireSlow())
-		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue)
+		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue)
 		c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
 		return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 	case errs.RetErrNotFound:
@@ -614,7 +616,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
 	}
 	if rsp.GetErrCode() != errs.RetSuccess {
 		cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, util.InvalidValue, rsp.GetRequireSlow())
-		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue)
+		c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue)
 		c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
 		c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, false)
 		return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 5545e0d..6e66af9 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -53,13 +53,21 @@ type RmtDataCache struct {
 	partitionTimeouts  map[string]*time.Timer
 	topicPartitions    map[string]map[string]bool
 	partitionRegBooked map[string]bool
-	partitionOffset    map[string]int64
+	partitionOffset    map[string]*ConsumerOffset
 	groupHandler       *flowctrl.RuleHandler
 	defHandler         *flowctrl.RuleHandler
 	// EventCh is the channel for consumer to consume
 	EventCh chan *metadata.ConsumerEvent
 }
 
+// ConsumerOffset of a consumption.
+type ConsumerOffset struct {
+	PartitionKey string
+	CurrOffset   int64
+	MaxOffset    int64
+	UpdateTime   int64
+}
+
 // NewRmtDataCache returns a default rmtDataCache.
 func NewRmtDataCache() *RmtDataCache {
 	r := &RmtDataCache{
@@ -73,7 +81,7 @@ func NewRmtDataCache() *RmtDataCache {
 		partitionTimeouts:  make(map[string]*time.Timer),
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
-		partitionOffset:    make(map[string]int64),
+		partitionOffset:    make(map[string]*ConsumerOffset),
 		groupHandler:       flowctrl.NewRuleHandler(),
 		defHandler:         flowctrl.NewRuleHandler(),
 		EventCh:            make(chan *metadata.ConsumerEvent, 1),
@@ -461,11 +469,28 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
 	r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
 }
 
-func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset int64) {
+func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset int64, maxOffset int64) {
+	r.dataBookMu.Lock()
+	defer r.dataBookMu.Unlock()
+	if _, ok := r.partitionOffset[partitionKey]; !ok {
+		co := &ConsumerOffset{
+			CurrOffset:   util.InvalidValue,
+			PartitionKey: partitionKey,
+			MaxOffset:    util.InvalidValue,
+			UpdateTime:   util.InvalidValue,
+		}
+		r.partitionOffset[partitionKey] = co
+	}
+	updated := false
+	co := r.partitionOffset[partitionKey]
 	if currOffset >= 0 {
-		r.dataBookMu.Lock()
-		defer r.dataBookMu.Unlock()
-		r.partitionOffset[partitionKey] = currOffset
+		co.CurrOffset = currOffset
+	}
+	if maxOffset >= 0 {
+		co.MaxOffset = maxOffset
+	}
+	if updated {
+		co.UpdateTime = time.Now().UnixNano() / int64(time.Millisecond)
 	}
 }
 
@@ -519,7 +544,7 @@ func (r *RmtDataCache) GetAllClosedBrokerParts() map[*metadata.Node][]*metadata.
 }
 
 // GetCurPartitionOffset returns the partition to offset map.
-func (r *RmtDataCache) GetCurPartitionOffset() map[string]int64 {
+func (r *RmtDataCache) GetCurPartitionOffset() map[string]*ConsumerOffset {
 	r.dataBookMu.Lock()
 	defer r.dataBookMu.Unlock()
 	return r.partitionOffset