You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/08/09 06:59:59 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1427]Go SDK
return maxOffset and updateTime in ConsumerOffset (#1428)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 968eae0 [INLONG-1427]Go SDK return maxOffset and updateTime in ConsumerOffset (#1428)
968eae0 is described below
commit 968eae037dbde0252d75db4296fe0c2c89615b00
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon Aug 9 14:59:55 2021 +0800
[INLONG-1427]Go SDK return maxOffset and updateTime in ConsumerOffset (#1428)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/consumer.go | 12 +++----
.../tubemq-client-go/client/consumer_impl.go | 24 +++++++------
.../tubemq-client-go/remote/remote.go | 39 ++++++++++++++++++----
3 files changed, 50 insertions(+), 25 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index bea0223..b805c30 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,6 +19,10 @@
// which can be exposed to user.
package client
+import (
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+)
+
// ConsumerResult of a consumption.
type ConsumerResult struct {
TopicName string
@@ -27,12 +31,6 @@ type ConsumerResult struct {
Messages []*Message
}
-// ConsumerOffset of a consumption.
-type ConsumerOffset struct {
- PartitionKey string
- CurrOffset int64
-}
-
var clientID uint64
// Consumer is an interface that abstracts behavior of TubeMQ's consumer
@@ -42,7 +40,7 @@ type Consumer interface {
// Confirm the consumption of a message.
Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
// GetCurrConsumedInfo returns the consumptions of the consumer.
- GetCurrConsumedInfo() map[string]*ConsumerOffset
+ GetCurrConsumedInfo() map[string]*remote.ConsumerOffset
// Close closes the consumer client and release the resources.
Close() error
// GetClientID returns the clientID of the consumer.
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index a4bc2f2..e42319b 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -286,7 +286,7 @@ func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResul
return cs, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
currOffset := rsp.GetCurrOffset()
- c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset)
+ c.rmtDataCache.BookPartitionInfo(partitionKey, currOffset, util.InvalidValue)
err = c.rmtDataCache.ReleasePartition(true, c.subInfo.IsFiltered(topic), confirmContext, consumed)
return cs, err
}
@@ -324,15 +324,17 @@ func parsePartitionKeyToTopic(partitionKey string) (string, error) {
}
// GetCurrConsumedInfo implementation of TubeMQ consumer.
-func (c *consumer) GetCurrConsumedInfo() map[string]*ConsumerOffset {
+func (c *consumer) GetCurrConsumedInfo() map[string]*remote.ConsumerOffset {
partitionOffset := c.rmtDataCache.GetCurPartitionOffset()
- consumedInfo := make(map[string]*ConsumerOffset, len(partitionOffset))
- for partition, offset := range partitionOffset {
- co := &ConsumerOffset{
- PartitionKey: partition,
- CurrOffset: offset,
+ consumedInfo := make(map[string]*remote.ConsumerOffset, len(partitionOffset))
+ for partitionKey, offset := range partitionOffset {
+ co := &remote.ConsumerOffset{
+ PartitionKey: partitionKey,
+ CurrOffset: offset.CurrOffset,
+ MaxOffset: offset.MaxOffset,
+ UpdateTime: offset.UpdateTime,
}
- consumedInfo[partition] = co
+ consumedInfo[partitionKey] = co
}
return consumedInfo
}
@@ -581,7 +583,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
maxOffset = rsp.GetMaxOffset()
}
msgSize, msgs := c.convertMessages(filtered, partition.GetTopic(), rsp)
- c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), currOffset)
+ c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), currOffset, maxOffset)
cd := metadata.NewConsumeData(now, 200, escLimit, int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
pi.CurrOffset = currOffset
@@ -601,7 +603,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
defDltTime = c.config.Consumer.MsgNotFoundWait.Milliseconds()
}
cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, defDltTime, rsp.GetRequireSlow())
- c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue)
+ c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue)
c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
case errs.RetErrNotFound:
@@ -614,7 +616,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
}
if rsp.GetErrCode() != errs.RetSuccess {
cd := metadata.NewConsumeData(now, rsp.GetErrCode(), false, 0, limitDlt, util.InvalidValue, rsp.GetRequireSlow())
- c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue)
+ c.rmtDataCache.BookPartitionInfo(partition.GetPartitionKey(), util.InvalidValue, util.InvalidValue)
c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
c.rmtDataCache.ReleasePartition(true, filtered, confirmContext, false)
return nil, errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 5545e0d..6e66af9 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -53,13 +53,21 @@ type RmtDataCache struct {
partitionTimeouts map[string]*time.Timer
topicPartitions map[string]map[string]bool
partitionRegBooked map[string]bool
- partitionOffset map[string]int64
+ partitionOffset map[string]*ConsumerOffset
groupHandler *flowctrl.RuleHandler
defHandler *flowctrl.RuleHandler
// EventCh is the channel for consumer to consume
EventCh chan *metadata.ConsumerEvent
}
+// ConsumerOffset of a consumption.
+type ConsumerOffset struct {
+ PartitionKey string
+ CurrOffset int64
+ MaxOffset int64
+ UpdateTime int64
+}
+
// NewRmtDataCache returns a default rmtDataCache.
func NewRmtDataCache() *RmtDataCache {
r := &RmtDataCache{
@@ -73,7 +81,7 @@ func NewRmtDataCache() *RmtDataCache {
partitionTimeouts: make(map[string]*time.Timer),
topicPartitions: make(map[string]map[string]bool),
partitionRegBooked: make(map[string]bool),
- partitionOffset: make(map[string]int64),
+ partitionOffset: make(map[string]*ConsumerOffset),
groupHandler: flowctrl.NewRuleHandler(),
defHandler: flowctrl.NewRuleHandler(),
EventCh: make(chan *metadata.ConsumerEvent, 1),
@@ -461,11 +469,28 @@ func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
}
-func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset int64) {
+func (r *RmtDataCache) BookPartitionInfo(partitionKey string, currOffset int64, maxOffset int64) {
+ r.dataBookMu.Lock()
+ defer r.dataBookMu.Unlock()
+ if _, ok := r.partitionOffset[partitionKey]; !ok {
+ co := &ConsumerOffset{
+ CurrOffset: util.InvalidValue,
+ PartitionKey: partitionKey,
+ MaxOffset: util.InvalidValue,
+ UpdateTime: util.InvalidValue,
+ }
+ r.partitionOffset[partitionKey] = co
+ }
+ updated := false
+ co := r.partitionOffset[partitionKey]
if currOffset >= 0 {
- r.dataBookMu.Lock()
- defer r.dataBookMu.Unlock()
- r.partitionOffset[partitionKey] = currOffset
+ co.CurrOffset = currOffset
+ }
+ if maxOffset >= 0 {
+ co.MaxOffset = maxOffset
+ }
+ if updated {
+ co.UpdateTime = time.Now().UnixNano() / int64(time.Millisecond)
}
}
@@ -519,7 +544,7 @@ func (r *RmtDataCache) GetAllClosedBrokerParts() map[*metadata.Node][]*metadata.
}
// GetCurPartitionOffset returns the partition to offset map.
-func (r *RmtDataCache) GetCurPartitionOffset() map[string]int64 {
+func (r *RmtDataCache) GetCurPartitionOffset() map[string]*ConsumerOffset {
r.dataBookMu.Lock()
defer r.dataBookMu.Unlock()
return r.partitionOffset