You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:54 UTC
[incubator-inlong] 01/09: [INLONG-624]Go SDK consumer interface
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 0d4ed41f647eb5155c936f10f0f61113c44161b3
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jun 3 16:02:09 2021 +0800
[INLONG-624]Go SDK consumer interface
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../{metadata/node.go => client/consumer.go} | 51 ++-
.../tubemq-client-go/client/consumer_impl.go | 359 +++++++++++++++++++++
.../tubemq-client-go/client/heartbeat.go | 229 +++++++++++++
.../tubemq-client-go/client/remote.go | 79 -----
.../tubemq-client-go/config/config.go | 18 +-
.../tubemq-client-go/config/config_test.go | 10 +-
tubemq-client-twins/tubemq-client-go/errs/errs.go | 12 +-
.../tubemq-client-go/metadata/consumer_event.go | 8 +
.../tubemq-client-go/metadata/metadata.go | 20 ++
.../tubemq-client-go/metadata/node.go | 54 ++++
.../tubemq-client-go/metadata/partition.go | 42 ++-
.../tubemq-client-go/metadata/subcribe_info.go | 54 +++-
.../tubemq-client-go/remote/remote.go | 331 +++++++++++++++++++
tubemq-client-twins/tubemq-client-go/rpc/broker.go | 32 +-
tubemq-client-twins/tubemq-client-go/rpc/client.go | 23 +-
tubemq-client-twins/tubemq-client-go/rpc/master.go | 19 +-
.../tubemq-client-go/{client => sub}/info.go | 77 ++++-
.../tubemq-client-go/transport/client.go | 8 +-
tubemq-client-twins/tubemq-client-go/util/util.go | 52 +++
19 files changed, 1299 insertions(+), 179 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
similarity index 52%
copy from tubemq-client-twins/tubemq-client-go/metadata/node.go
copy to tubemq-client-twins/tubemq-client-go/client/consumer.go
index 625f278..27a8536 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -15,41 +15,32 @@
* limitations under the License.
*/
-package metadata
+// Package client defines the api and information
+// which can be exposed to user.
+package client
-import (
- "strconv"
+const (
+ tubeMQClientVersion = "0.1.0"
)
-// Node represents the metadata of a node.
-type Node struct {
- id uint32
- host string
- port uint32
- address string
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
}
-// GetID returns the id of a node.
-func (n *Node) GetID() uint32 {
- return n.id
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
}
-// GetPort returns the port of a node.
-func (n *Node) GetPort() uint32 {
- return n.port
-}
-
-// GetHost returns the hostname of a node.
-func (n *Node) GetHost() string {
- return n.host
-}
-
-// GetAddress returns the address of a node.
-func (n *Node) GetAddress() string {
- return n.address
-}
-
-// String returns the metadata of a node as a string.
-func (n *Node) String() string {
- return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port))
+var clientIndex uint64
+
+// Consumer is an interface that abstracts behavior of TubeMQ's consumer
+type Consumer interface {
+ // Start starts the consumer.
+ Start() error
+ // GetMessage receive a single message.
+ GetMessage() (*ConsumerResult, error)
+ // Confirm the consumption of a message.
+ Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
+ // GetCurrConsumedInfo returns the consumptions of the consumer.
+ GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
new file mode 100644
index 0000000..c0f7e9a
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+ consumeStatusNormal = 0
+ consumeStatusFromMax = 1
+ consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+ clientID string
+ config *config.Config
+ subInfo *sub.SubInfo
+ rmtDataCache *remote.RmtDataCache
+ visitToken int64
+ authorizedInfo string
+ nextAuth2Master int32
+ nextAuth2Broker int32
+ master *selector.Node
+ client rpc.RPCClient
+ selector selector.Selector
+ lastMasterHb int64
+ masterHBRetry int
+ heartbeatManager *heartbeatManager
+ unreportedTimes int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+ selector, err := selector.Get("ip")
+ if err != nil {
+ return nil, err
+ }
+
+ clientID := newClientID(config.Consumer.Group)
+ pool := multiplexing.NewPool()
+ opts := &transport.Options{}
+ if config.Net.TLS.Enable {
+ opts.CACertFile = config.Net.TLS.CACertFile
+ opts.TLSCertFile = config.Net.TLS.TLSCertFile
+ opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+ opts.TLSServerName = config.Net.TLS.TLSServerName
+ }
+ client := rpc.New(pool, opts)
+ r := remote.NewRmtDataCache()
+ r.SetConsumerInfo(clientID, config.Consumer.Group)
+ c := &consumer{
+ config: config,
+ clientID: clientID,
+ subInfo: sub.NewSubInfo(config),
+ rmtDataCache: r,
+ selector: selector,
+ client: client,
+ visitToken: util.InvalidValue,
+ unreportedTimes: 0,
+ }
+ c.subInfo.SetClientID(clientID)
+ hbm := newHBManager(c)
+ c.heartbeatManager = hbm
+ return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+ err := c.register2Master(false)
+ if err != nil {
+ return err
+ }
+ c.heartbeatManager.registerMaster(c.master.Address)
+ go c.processRebalanceEvent()
+ return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+ if needChange {
+ node, err := c.selector.Select(c.config.Consumer.Masters)
+ if err != nil {
+ return err
+ }
+ c.master = node
+ }
+ for c.master.HasNext {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+
+ rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ if err != nil {
+ cancel()
+ return err
+ }
+ if rsp.GetSuccess() {
+ c.masterHBRetry = 0
+ c.processRegisterResponseM2C(rsp)
+ cancel()
+ return nil
+ } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ cancel()
+ return nil
+ } else {
+ c.master, err = c.selector.Select(c.config.Consumer.Masters)
+ cancel()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+ if rsp.GetNotAllocated() {
+ c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ }
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+ c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+ atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+ c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+ panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+ for {
+ event := c.rmtDataCache.TakeEvent()
+ if event == nil {
+ continue
+ }
+ if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+ break
+ }
+ c.rmtDataCache.ClearEvent()
+ switch event.GetEventType() {
+ case 2, 20:
+ c.disconnect2Broker(event)
+ c.rmtDataCache.OfferEvent(event)
+ case 1, 10:
+ c.connect2Broker(event)
+ c.rmtDataCache.OfferEvent(event)
+ }
+ }
+}
+
+func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
+ subscribeInfo := event.GetSubscribeInfo()
+ if len(subscribeInfo) > 0 {
+ removedPartitions := make(map[*metadata.Node][]*metadata.Partition)
+ c.rmtDataCache.RemoveAndGetPartition(subscribeInfo, c.config.Consumer.RollbackIfConfirmTimeout, removedPartitions)
+ if len(removedPartitions) > 0 {
+ c.unregister2Broker(removedPartitions)
+ }
+ }
+ event.SetEventStatus(2)
+}
+
+func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) {
+ if len(unRegPartitions) == 0 {
+ return
+ }
+
+ for _, partitions := range unRegPartitions {
+ for _, partition := range partitions {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(partition.GetBroker().GetAddress())
+ m.SetNode(node)
+ m.SetReadStatus(1)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ sub.SetConsumerID(c.clientID)
+ sub.SetPartition(partition)
+ m.SetSubscribeInfo(sub)
+
+ c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+ cancel()
+ }
+ }
+}
+
+func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+ if len(event.GetSubscribeInfo()) > 0 {
+ unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
+ if len(unsubPartitions) > 0 {
+ for _, partition := range unsubPartitions {
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(partition.GetBroker().GetAddress())
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ sub.SetConsumerID(c.clientID)
+ sub.SetPartition(partition)
+ m.SetSubscribeInfo(sub)
+ isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+ m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+ auth := c.genBrokerAuthenticInfo(true)
+ c.subInfo.SetAuthorizedInfo(auth)
+
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+ rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+ if err != nil {
+ //todo add log
+ }
+ if rsp.GetSuccess() {
+ c.rmtDataCache.AddNewPartition(partition)
+ c.heartbeatManager.registerBroker(node)
+ }
+ cancel()
+ }
+ }
+ }
+ c.subInfo.FirstRegistered()
+ event.SetEventStatus(2)
+}
+
+func newClientIndex() uint64 {
+ return atomic.AddUint64(&clientIndex, 1)
+}
+
+func newClientID(group string) string {
+ return group + "_" +
+ util.GetLocalHost() + "_" +
+ strconv.Itoa(os.Getpid()) + "_" +
+ strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
+ strconv.Itoa(int(newClientIndex())) + "_" +
+ tubeMQClientVersion
+}
+
+func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo {
+ needAdd := false
+ auth := &protocol.AuthorizedInfo{}
+ if c.config.Net.Auth.Enable {
+ if force {
+ needAdd = true
+ atomic.StoreInt32(&c.nextAuth2Broker, 0)
+ } else if atomic.LoadInt32(&c.nextAuth2Broker) == 1 {
+ if atomic.CompareAndSwapInt32(&c.nextAuth2Broker, 1, 0) {
+ needAdd = true
+ }
+ }
+ if needAdd {
+ authToken := util.GenBrokerAuthenticateToken(c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+ auth.AuthAuthorizedToken = proto.String(authToken)
+ }
+ }
+ return auth
+}
+
+func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) {
+ needAdd := false
+ if c.config.Net.Auth.Enable {
+ if force {
+ needAdd = true
+ atomic.StoreInt32(&c.nextAuth2Master, 0)
+ } else if atomic.LoadInt32(&c.nextAuth2Master) == 1 {
+ if atomic.CompareAndSwapInt32(&c.nextAuth2Master, 1, 0) {
+ needAdd = true
+ }
+ }
+ if needAdd {
+ }
+ }
+}
+
+func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
+ readStatus := consumeStatusNormal
+ if isFirstReg {
+ if c.config.Consumer.ConsumePosition == 0 {
+ readStatus = consumeStatusFromMax
+ } else if c.config.Consumer.ConsumePosition > 0 {
+ readStatus = consumeStatusFromMaxAlways
+ }
+ }
+ return int32(readStatus)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
new file mode 100644
index 0000000..9c29854
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type heartbeatMetadata struct {
+ numConnections int
+ timer *time.Timer
+}
+
+type heartbeatManager struct {
+ consumer *consumer
+ heartbeats map[string]*heartbeatMetadata
+ mu sync.Mutex
+}
+
+func newHBManager(consumer *consumer) *heartbeatManager {
+ return &heartbeatManager{
+ consumer: consumer,
+ heartbeats: make(map[string]*heartbeatMetadata),
+ }
+}
+
+func (h *heartbeatManager) registerMaster(address string) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+ if _, ok := h.heartbeats[address]; !ok {
+ h.heartbeats[address] = &heartbeatMetadata{
+ numConnections: 1,
+ timer: time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master),
+ }
+ }
+ hm := h.heartbeats[address]
+ hm.numConnections++
+}
+
+func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+
+ if _, ok := h.heartbeats[broker.GetAddress()]; !ok {
+ h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{
+ numConnections: 1,
+ timer: time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() { h.consumerHB2Broker(broker) }),
+ }
+ }
+ hm := h.heartbeats[broker.GetAddress()]
+ hm.numConnections++
+}
+
+func (h *heartbeatManager) consumerHB2Master() {
+ if time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 {
+ h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait)
+ }
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(h.consumer.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(h.consumer.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+ h.consumer.unreportedTimes++
+ if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval {
+ m.SetReportTimes(true)
+ h.consumer.unreportedTimes = 0
+ }
+
+ retry := 0
+ for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
+ ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+ rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+ if err != nil {
+ cancel()
+ }
+ if rsp.GetSuccess() {
+ cancel()
+ h.processHBResponseM2C(rsp)
+ break
+ } else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+ cancel()
+ h.consumer.masterHBRetry++
+ address := h.consumer.master.Address
+ go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+ if rsp.GetErrCode() != errs.RetErrHBNoNode {
+ hm := h.heartbeats[address]
+ hm.numConnections--
+ if hm.numConnections == 0 {
+ h.mu.Lock()
+ delete(h.heartbeats, address)
+ h.mu.Unlock()
+ }
+ }
+ return
+ }
+ cancel()
+ }
+ h.mu.Lock()
+ defer h.mu.Unlock()
+ hm := h.heartbeats[h.consumer.master.Address]
+ hm.timer.Reset(h.nextHeartbeatInterval())
+}
+
+func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
+ h.consumer.masterHBRetry = 0
+ h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+ h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ if rsp.GetRequireAuth() {
+ atomic.StoreInt32(&h.consumer.nextAuth2Master, 1)
+ }
+ if rsp.GetEvent() != nil {
+ event := rsp.GetEvent()
+ subscribeInfo := make([]*metadata.SubscribeInfo, 0, len(event.GetSubscribeInfo()))
+ for _, sub := range event.GetSubscribeInfo() {
+ s, err := metadata.NewSubscribeInfo(sub)
+ if err != nil {
+ continue
+ }
+ subscribeInfo = append(subscribeInfo, s)
+ }
+ e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo)
+ h.consumer.rmtDataCache.OfferEvent(e)
+ }
+}
+
+func (h *heartbeatManager) nextHeartbeatInterval() time.Duration {
+ interval := h.consumer.config.Heartbeat.Interval
+ if h.consumer.masterHBRetry >= h.consumer.config.Heartbeat.MaxRetryTimes {
+ interval = h.consumer.config.Heartbeat.AfterFail
+ }
+ return interval
+}
+
+func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+
+ partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+ if len(partitions) == 0 {
+ h.resetBrokerTimer(broker)
+ return
+ }
+ m := &metadata.Metadata{}
+ m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+ m.SetNode(broker)
+ ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+ defer cancel()
+
+ rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+ if err != nil {
+ return
+ }
+ if rsp.GetSuccess() {
+ if rsp.GetHasPartFailure() {
+ partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
+ for _, fi := range rsp.GetFailureInfo() {
+ pos := strings.Index(fi, ":")
+ if pos == -1 {
+ continue
+ }
+ partition, err := metadata.NewPartition(fi[pos+1:])
+ if err != nil {
+ continue
+ }
+ partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+ }
+ h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+ } else {
+ if rsp.GetErrCode() == errs.RetCertificateFailure {
+ partitionKeys := make([]string, 0, len(partitions))
+ for _, partition := range partitions {
+ partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+ }
+ h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+ }
+ }
+ }
+ h.resetBrokerTimer(broker)
+}
+
+func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
+ interval := h.consumer.config.Heartbeat.Interval
+ partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+ if len(partitions) == 0 {
+ delete(h.heartbeats, broker.GetAddress())
+ } else {
+ hm := h.heartbeats[broker.GetAddress()]
+ hm.timer.Reset(interval)
+ }
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go
deleted file mode 100644
index e36b69f..0000000
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package client defines the api and information
-// which can be exposed to user.
-package client
-
-import (
- "sync"
-
- "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
-)
-
-// RmtDataCache represents the data returned from TubeMQ.
-type RmtDataCache struct {
- consumerID string
- groupName string
- underGroupCtrl bool
- defFlowCtrlID int64
- groupFlowCtrlID int64
- subscribeInfo []*metadata.SubscribeInfo
- rebalanceResults []*metadata.ConsumerEvent
- mu sync.Mutex
- brokerToPartitions map[*metadata.Node][]*metadata.Partition
-}
-
-// GetUnderGroupCtrl returns the underGroupCtrl.
-func (r *RmtDataCache) GetUnderGroupCtrl() bool {
- return r.underGroupCtrl
-}
-
-// GetDefFlowCtrlID returns the defFlowCtrlID.
-func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
- return r.defFlowCtrlID
-}
-
-// GetGroupFlowCtrlID returns the groupFlowCtrlID.
-func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
- return r.groupFlowCtrlID
-}
-
-// GetSubscribeInfo returns the subscribeInfo.
-func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
- return r.subscribeInfo
-}
-
-// PollEventResult polls the first event result from the rebalanceResults.
-func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
- r.mu.Lock()
- defer r.mu.Unlock()
- if len(r.rebalanceResults) > 0 {
- event := r.rebalanceResults[0]
- r.rebalanceResults = r.rebalanceResults[1:]
- return event
- }
- return nil
-}
-
-// GetPartitionByBroker returns the
-func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) []*metadata.Partition {
- if partitions, ok := r.brokerToPartitions[node]; ok {
- return partitions
- }
- return nil
-}
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 47a360a..7a93831 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -29,7 +29,7 @@ import (
// Config defines multiple configuration options.
// Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
type Config struct {
- // Net is the namespace for network-level properties used by Broker and Master.
+ // Net is the namespace for network-level properties used by broker and Master.
Net struct {
// ReadTimeout represents how long to wait for a response.
ReadTimeout time.Duration
@@ -61,9 +61,13 @@ type Config struct {
// used by the consumer
Consumer struct {
// Masters is the addresses of master.
- Masters []string
- // Topic of the consumption.
- Topic string
+ Masters string
+ // Topics of the consumption.
+ Topics []string
+ // TopicFilters is the map of topic to filters.
+ TopicFilters map[string][]string
+ // PartitionOffset is the map of partition to its corresponding offset.
+ PartitionOffset map[string]int64
// ConsumerPosition is the initial offset to use if no offset was previously committed.
ConsumePosition int
// Group is the consumer group name.
@@ -152,7 +156,7 @@ func ParseAddress(address string) (config *Config, err error) {
return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens)
}
- c.Consumer.Masters = strings.Split(tokens[0], ",")
+ c.Consumer.Masters = tokens[0]
tokens = strings.Split(tokens[1], "&")
if len(tokens) == 0 {
@@ -190,8 +194,8 @@ func getConfigFromToken(config *Config, values []string) error {
config.Net.TLS.TLSServerName = values[1]
case "group":
config.Consumer.Group = values[1]
- case "topic":
- config.Consumer.Topic = values[1]
+ case "topics":
+ config.Consumer.Topics = strings.Split(values[1], ",")
case "consumePosition":
config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
case "boundConsume":
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index 56bc600..a8ac703 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -25,11 +25,11 @@ import (
)
func TestParseAddress(t *testing.T) {
- address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+ address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
c, err := ParseAddress(address)
assert.Nil(t, err)
- assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
- assert.Equal(t, c.Consumer.Topic, "Topic")
+ assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093")
+ assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"})
assert.Equal(t, c.Consumer.Group, "Group")
assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
@@ -41,11 +41,11 @@ func TestParseAddress(t *testing.T) {
_, err = ParseAddress(address)
assert.NotNil(t, err)
- address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
+ address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt"
_, err = ParseAddress(address)
assert.NotNil(t, err)
- address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
+ address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt=ttt"
_, err = ParseAddress(address)
assert.NotNil(t, err)
}
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 00b3c81..eb45179 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -33,12 +33,18 @@ const (
RetAssertionFailure = 4
// RetRequestFailure represents the error code of request error.
RetRequestFailure = 5
- // RetSelectorNotExist = 6
- RetSelectorNotExist = 6
+ // RetSelectorNotExist represents the selector not exists.
+ RetSelectorNotExist = 6
+ RetErrHBNoNode = 411
+ RetCertificateFailure = 415
+ RetConsumeGroupForbidden = 450
+ RetConsumeContentForbidden = 455
)
// ErrAssertionFailure represents RetAssertionFailure error.
-var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+var (
+ ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+)
// Error provides a TubeMQ-specific error container
type Error struct {
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 2b4ce49..6ce2915 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -25,6 +25,14 @@ type ConsumerEvent struct {
subscribeInfo []*SubscribeInfo
}
+func NewEvent(rebalanceID int64, eventType int32, subscribeInfo []*SubscribeInfo) *ConsumerEvent {
+ return &ConsumerEvent{
+ rebalanceID: rebalanceID,
+ eventType: eventType,
+ subscribeInfo: subscribeInfo,
+ }
+}
+
// GetRebalanceID returns the rebalanceID of a consumer event.
func (c *ConsumerEvent) GetRebalanceID() int64 {
return c.rebalanceID
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
index 861a06e..f64674a 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -45,3 +45,23 @@ func (m *Metadata) GetReadStatus() int32 {
func (m *Metadata) GetReportTimes() bool {
return m.reportTimes
}
+
+// SetNode sets the node.
+func (m *Metadata) SetNode(node *Node) {
+ m.node = node
+}
+
+// SetSubscribeInfo sets the subscribeInfo.
+func (m *Metadata) SetSubscribeInfo(sub *SubscribeInfo) {
+ m.subscribeInfo = sub
+}
+
+// ReadStatus sets the status.
+func (m *Metadata) SetReadStatus(status int32) {
+ m.readStatus = status
+}
+
+// SetReportTimes sets the reportTimes.
+func (m *Metadata) SetReportTimes(reportTimes bool) {
+ m.reportTimes = reportTimes
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go
index 625f278..bcfda82 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -19,6 +19,7 @@ package metadata
import (
"strconv"
+ "strings"
)
// Node represents the metadata of a node.
@@ -29,6 +30,43 @@ type Node struct {
address string
}
+// NewNode constructs a node from a given string.
+// If the given string is invalid, it will return error.
+func NewNode(isBroker bool, node string) (*Node, error) {
+ res := strings.Split(node, ":")
+ nodeID := 0
+ host := ""
+ port := 8123
+ var err error
+ if isBroker {
+ nodeID, err = strconv.Atoi(res[0])
+ if err != nil {
+ return nil, err
+ }
+ host = res[1]
+ if len(res) >= 3 {
+ port, err = strconv.Atoi(res[2])
+ if err != nil {
+ return nil, err
+ }
+ }
+ } else {
+ host = res[0]
+ if len(res) >= 2 {
+ port, err = strconv.Atoi(res[1])
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ return &Node{
+ id: uint32(nodeID),
+ host: host,
+ port: uint32(port),
+ address: host + ":" + strconv.Itoa(port),
+ }, nil
+}
+
// GetID returns the id of a node.
func (n *Node) GetID() uint32 {
return n.id
@@ -53,3 +91,19 @@ func (n *Node) GetAddress() string {
func (n *Node) String() string {
return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port))
}
+
+// SetHost sets the host.
+func (n *Node) SetHost(host string) {
+ n.host = host
+}
+
+// SetAddress sets the address.
+func (n *Node) SetAddress(address string) error {
+ port, err := strconv.Atoi(strings.Split(address, ":")[1])
+ if err != nil {
+ return err
+ }
+ n.address = address
+ n.port = uint32(port)
+ return nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index a180214..fbe1086 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -19,18 +19,48 @@ package metadata
import (
"strconv"
+ "strings"
)
// Partition represents the metadata of a partition.
type Partition struct {
topic string
- Broker *Node
+ broker *Node
partitionID int32
partitionKey string
offset int64
lastConsumed bool
}
+func NewPartition(partition string) (*Partition, error) {
+ var b *Node
+ var topic string
+ var partitionID int
+ var err error
+ pos := strings.Index(partition, "#")
+ if pos != -1 {
+ broker := partition[:pos]
+ b, err = NewNode(true, broker)
+ if err != nil {
+ return nil, err
+ }
+ p := partition[pos+1:]
+ pos = strings.Index(p, ":")
+ if pos != -1 {
+ topic = p[0:pos]
+ partitionID, err = strconv.Atoi(p[pos+1:])
+ if err != nil {
+ return nil, err
+ }
+ }
+ }
+ return &Partition{
+ topic: topic,
+ broker: b,
+ partitionID: int32(partitionID),
+ }, nil
+}
+
// GetLastConsumed returns lastConsumed of a partition.
func (p *Partition) GetLastConsumed() bool {
return p.lastConsumed
@@ -51,7 +81,15 @@ func (p *Partition) GetTopic() string {
return p.topic
}
+func (p *Partition) GetBroker() *Node {
+ return p.broker
+}
+
// String returns the metadata of a Partition as a string.
func (p *Partition) String() string {
- return p.Broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+ return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+}
+
+func (p *Partition) SetLastConsumed(lastConsumed bool) {
+ p.lastConsumed = lastConsumed
}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index d76437a..90ae3ef 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -19,14 +19,14 @@ package metadata
import (
"fmt"
+ "strings"
)
// SubscribeInfo represents the metadata of the subscribe info.
type SubscribeInfo struct {
- group string
- consumerID string
- partition *Partition
- qryPriorityID int32
+ group string
+ consumerID string
+ partition *Partition
}
// GetGroup returns the group name.
@@ -44,12 +44,48 @@ func (s *SubscribeInfo) GetPartition() *Partition {
return s.partition
}
-// GetQryPriorityID returns the QryPriorityID.
-func (s *SubscribeInfo) GetQryPriorityID() int32 {
- return s.qryPriorityID
-}
-
// String returns the contents of SubscribeInfo as a string.
func (s *SubscribeInfo) String() string {
return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, s.partition.String())
}
+
+// NewSubscribeInfo constructs a SubscribeInfo from a given string.
+// If the given is invalid, it will return error.
+func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
+ consumerID := ""
+ group := ""
+ var partition *Partition
+ var err error
+ pos := strings.Index(subscribeInfo, "#")
+ if pos != -1 {
+ consumerInfo := subscribeInfo[:pos]
+ partitionInfo := subscribeInfo[pos+1:]
+ partition, err = NewPartition(partitionInfo)
+ if err != nil {
+ return nil, err
+ }
+ pos = strings.Index(consumerInfo, "@")
+ consumerID = consumerInfo[:pos]
+ group = consumerInfo[pos+1:]
+ }
+ return &SubscribeInfo{
+ group: group,
+ consumerID: consumerID,
+ partition: partition,
+ }, nil
+}
+
+// SetPartition sets the partition.
+func (s *SubscribeInfo) SetPartition(partition *Partition) {
+ s.partition = partition
+}
+
+// SetGroup sets the group.
+func (s *SubscribeInfo) SetGroup(group string) {
+ s.group = group
+}
+
+// SetConsumerID sets the consumerID.
+func (s *SubscribeInfo) SetConsumerID(id string) {
+ s.consumerID = id
+}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
new file mode 100644
index 0000000..aa9a2d1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package remote defines the remote data which is returned from TubeMQ.
+package remote
+
+import (
+ "sync"
+ "time"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+// RmtDataCache represents the data returned from TubeMQ.
+type RmtDataCache struct {
+ consumerID string
+ groupName string
+ underGroupCtrl bool
+ defFlowCtrlID int64
+ groupFlowCtrlID int64
+ partitionSubInfo map[string]*metadata.SubscribeInfo
+ rebalanceResults []*metadata.ConsumerEvent
+ eventMu sync.Mutex
+ metaMu sync.Mutex
+ dataBookMu sync.Mutex
+ brokerPartitions map[*metadata.Node]map[string]bool
+ qryPriorityID int32
+ partitions map[string]*metadata.Partition
+ usedPartitions map[string]int64
+ indexPartitions map[string]bool
+ partitionTimeouts map[string]*time.Timer
+ topicPartitions map[string]map[string]bool
+ partitionRegBooked map[string]bool
+}
+
+// NewRmtDataCache returns a default rmtDataCache.
+func NewRmtDataCache() *RmtDataCache {
+ return &RmtDataCache{
+ defFlowCtrlID: util.InvalidValue,
+ groupFlowCtrlID: util.InvalidValue,
+ qryPriorityID: int32(util.InvalidValue),
+ partitionSubInfo: make(map[string]*metadata.SubscribeInfo),
+ rebalanceResults: make([]*metadata.ConsumerEvent, 0, 0),
+ brokerPartitions: make(map[*metadata.Node]map[string]bool),
+ partitions: make(map[string]*metadata.Partition),
+ usedPartitions: make(map[string]int64),
+ indexPartitions: make(map[string]bool),
+ partitionTimeouts: make(map[string]*time.Timer),
+ topicPartitions: make(map[string]map[string]bool),
+ partitionRegBooked: make(map[string]bool),
+ }
+}
+
+// GetUnderGroupCtrl returns the underGroupCtrl.
+func (r *RmtDataCache) GetUnderGroupCtrl() bool {
+ return r.underGroupCtrl
+}
+
+// GetDefFlowCtrlID returns the defFlowCtrlID.
+func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
+ return r.defFlowCtrlID
+}
+
+// GetGroupFlowCtrlID returns the groupFlowCtrlID.
+func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
+ return r.groupFlowCtrlID
+}
+
+// GetGroupName returns the group name.
+func (r *RmtDataCache) GetGroupName() string {
+ return r.groupName
+}
+
+// GetSubscribeInfo returns the partitionSubInfo.
+func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+ subInfos := make([]*metadata.SubscribeInfo, 0, len(r.partitionSubInfo))
+ for _, sub := range r.partitionSubInfo {
+ subInfos = append(subInfos, sub)
+ }
+ return subInfos
+}
+
+// GetQryPriorityID returns the QryPriorityID.
+func (r *RmtDataCache) GetQryPriorityID() int32 {
+ return r.qryPriorityID
+}
+
+// PollEventResult polls the first event result from the rebalanceResults.
+func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
+ r.eventMu.Lock()
+ defer r.eventMu.Unlock()
+ if len(r.rebalanceResults) > 0 {
+ event := r.rebalanceResults[0]
+ r.rebalanceResults = r.rebalanceResults[1:]
+ return event
+ }
+ return nil
+}
+
+// GetPartitionByBroker returns the subscribed partitions of the given broker.
+func (r *RmtDataCache) GetPartitionByBroker(broker *metadata.Node) []*metadata.Partition {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ if partitionMap, ok := r.brokerPartitions[broker]; ok {
+ partitions := make([]*metadata.Partition, 0, len(partitionMap))
+ for partition := range partitionMap {
+ partitions = append(partitions, r.partitions[partition])
+ }
+ return partitions
+ }
+ return nil
+}
+
+// SetConsumerInfo sets the consumer information including consumerID and groupName.
+func (r *RmtDataCache) SetConsumerInfo(consumerID string, group string) {
+ r.consumerID = consumerID
+ r.groupName = group
+}
+
+// UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo.
+func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo string) {
+
+}
+
+// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
+func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID int64, flowCtrlInfo string) {
+
+}
+
+// OfferEvent offers an consumer event.
+func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+ r.eventMu.Lock()
+ defer r.eventMu.Unlock()
+ r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
+// TakeEvent takes an event from the rebalanceResults.
+func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
+ r.eventMu.Lock()
+ defer r.eventMu.Unlock()
+ if len(r.rebalanceResults) == 0 {
+ return nil
+ }
+ event := r.rebalanceResults[0]
+ r.rebalanceResults = r.rebalanceResults[1:]
+ return event
+}
+
+// ClearEvent clears all the events.
+func (r *RmtDataCache) ClearEvent() {
+ r.eventMu.Lock()
+ defer r.eventMu.Unlock()
+ r.rebalanceResults = r.rebalanceResults[:0]
+}
+
+// RemoveAndGetPartition removes the given partitions.
+func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) {
+ if len(subscribeInfos) == 0 {
+ return
+ }
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+ for _, sub := range subscribeInfos {
+ partitionKey := sub.GetPartition().GetPartitionKey()
+ if partition, ok := r.partitions[partitionKey]; ok {
+ if _, ok := r.usedPartitions[partitionKey]; ok {
+ if processingRollback {
+ partition.SetLastConsumed(false)
+ } else {
+ partition.SetLastConsumed(true)
+ }
+ }
+ if _, ok := partitions[partition.GetBroker()]; !ok {
+ partitions[partition.GetBroker()] = []*metadata.Partition{partition}
+ } else {
+ partitions[partition.GetBroker()] = append(partitions[partition.GetBroker()], partition)
+ }
+ r.removeMetaInfo(partitionKey)
+ }
+ r.resetIdlePartition(partitionKey, false)
+ }
+}
+
+func (r *RmtDataCache) removeMetaInfo(partitionKey string) {
+ if partition, ok := r.partitions[partitionKey]; ok {
+ if partitions, ok := r.topicPartitions[partition.GetTopic()]; ok {
+ delete(partitions, partitionKey)
+ if len(partitions) == 0 {
+ delete(r.topicPartitions, partition.GetTopic())
+ }
+ }
+ if partitions, ok := r.brokerPartitions[partition.GetBroker()]; ok {
+ delete(partitions, partition.GetPartitionKey())
+ if len(partitions) == 0 {
+ delete(r.brokerPartitions, partition.GetBroker())
+ }
+ }
+ delete(r.partitions, partitionKey)
+ delete(r.partitionSubInfo, partitionKey)
+ }
+}
+
+func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
+ delete(r.usedPartitions, partitionKey)
+ if timer, ok := r.partitionTimeouts[partitionKey]; ok {
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timer.Stop()
+ delete(r.partitionTimeouts, partitionKey)
+ }
+ delete(r.indexPartitions, partitionKey)
+ if reuse {
+ if _, ok := r.partitions[partitionKey]; ok {
+ r.indexPartitions[partitionKey] = true
+ }
+ }
+}
+
+// FilterPartitions returns the unsubscribed partitions.
+func (r *RmtDataCache) FilterPartitions(subInfos []*metadata.SubscribeInfo) []*metadata.Partition {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+ unsubPartitions := make([]*metadata.Partition, 0, len(subInfos))
+ if len(r.partitions) == 0 {
+ for _, sub := range subInfos {
+ unsubPartitions = append(unsubPartitions, sub.GetPartition())
+ }
+ } else {
+ for _, sub := range subInfos {
+ if _, ok := r.partitions[sub.GetPartition().GetPartitionKey()]; !ok {
+ unsubPartitions = append(unsubPartitions, sub.GetPartition())
+ }
+ }
+ }
+ return unsubPartitions
+}
+
+// AddNewPartition append a new partition.
+func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) {
+ sub := &metadata.SubscribeInfo{}
+ sub.SetPartition(newPartition)
+ sub.SetConsumerID(r.consumerID)
+ sub.SetGroup(r.groupName)
+
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+ partitionKey := newPartition.GetPartitionKey()
+ if partition, ok := r.partitions[partitionKey]; !ok {
+ r.partitions[partitionKey] = partition
+ if partitions, ok := r.topicPartitions[partition.GetPartitionKey()]; !ok {
+ newPartitions := make(map[string]bool)
+ newPartitions[partitionKey] = true
+ r.topicPartitions[partition.GetTopic()] = newPartitions
+ } else if _, ok := partitions[partitionKey]; !ok {
+ partitions[partitionKey] = true
+ }
+ if partitions, ok := r.brokerPartitions[partition.GetBroker()]; !ok {
+ newPartitions := make(map[string]bool)
+ newPartitions[partitionKey] = true
+ r.brokerPartitions[partition.GetBroker()] = newPartitions
+ } else if _, ok := partitions[partitionKey]; !ok {
+ partitions[partitionKey] = true
+ }
+ r.partitionSubInfo[partitionKey] = sub
+ }
+ r.resetIdlePartition(partitionKey, true)
+}
+
+// HandleExpiredPartitions handles the expired partitions.
+func (r *RmtDataCache) HandleExpiredPartitions(wait time.Duration) {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+ expired := make(map[string]bool, len(r.usedPartitions))
+ if len(r.usedPartitions) > 0 {
+ curr := time.Now().UnixNano() / int64(time.Millisecond)
+ for partition, time := range r.usedPartitions {
+ if curr-time > wait.Milliseconds() {
+ expired[partition] = true
+ if p, ok := r.partitions[partition]; ok {
+ p.SetLastConsumed(false)
+ }
+ }
+ }
+ if len(expired) > 0 {
+ for partition := range expired {
+ r.resetIdlePartition(partition, true)
+ }
+ }
+ }
+}
+
+// RemovePartition removes the given partition keys.
+func (r *RmtDataCache) RemovePartition(partitionKeys []string) {
+ r.metaMu.Lock()
+ defer r.metaMu.Unlock()
+
+ for _, partitionKey := range partitionKeys {
+ r.resetIdlePartition(partitionKey, false)
+ r.removeMetaInfo(partitionKey)
+ }
+}
+
+// IsFirstRegister returns whether the given partition is first registered.
+func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
+ r.dataBookMu.Lock()
+ defer r.dataBookMu.Unlock()
+
+ if _, ok := r.partitionRegBooked[partitionKey]; !ok {
+ r.partitionRegBooked[partitionKey] = true
+ }
+ return r.partitionRegBooked[partitionKey]
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index 7f9ead2..5105fd5 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -22,11 +22,13 @@ import (
"github.com/golang/protobuf/proto"
- "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
)
const (
@@ -47,14 +49,14 @@ const (
)
// RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error) {
reqC2B := &protocol.RegisterRequestC2B{
OpType: proto.Int32(register),
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
- QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+ QryPriorityId: proto.Int32(r.GetQryPriorityID()),
ReadStatus: proto.Int32(metadata.GetReadStatus()),
AuthInfo: sub.GetAuthorizedInfo(),
}
@@ -66,7 +68,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
}
}
offset := sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey())
- if offset != client.InValidOffset {
+ if offset != util.InvalidValue {
reqC2B.CurrOffset = proto.Int64(offset)
}
data, err := proto.Marshal(reqC2B)
@@ -87,7 +89,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
@@ -101,7 +103,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
}
// UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error) {
reqC2B := &protocol.RegisterRequestC2B{
OpType: proto.Int32(unregister),
ClientId: proto.String(sub.GetClientID()),
@@ -129,7 +131,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
@@ -143,7 +145,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.
}
// GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
reqC2B := &protocol.GetMessageRequestC2B{
ClientId: proto.String(sub.GetClientID()),
PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -171,7 +173,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
@@ -185,7 +187,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
}
// CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
reqC2B := &protocol.CommitOffsetRequestC2B{
ClientId: proto.String(sub.GetClientID()),
TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -206,12 +208,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(brokerConsumerHeartbeat),
+ Method: proto.Int32(brokerConsumerCommit),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
@@ -225,12 +227,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
}
// HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
reqC2B := &protocol.HeartBeatRequestC2B{
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
ReadStatus: proto.Int32(metadata.GetReadStatus()),
- QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+ QryPriorityId: proto.Int32(r.GetQryPriorityID()),
AuthInfo: sub.GetAuthorizedInfo(),
}
partitions := r.GetPartitionByBroker(metadata.GetNode())
@@ -256,7 +258,7 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
Flag: proto.Int32(0),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 71123c7..fab5bae 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -21,13 +21,14 @@ package rpc
import (
"context"
- "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
)
@@ -39,21 +40,21 @@ const (
// RPCClient is the rpc level client to interact with TubeMQ.
type RPCClient interface {
// RegisterRequestC2B is the rpc request for a consumer to register to a broker.
- RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+ RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error)
// UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker.
- UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+ UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error)
// GetMessageRequestC2B is the rpc request for a consumer to get message from a broker.
- GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+ GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
// CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker.
- CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+ CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
// HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker.
- HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+ HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
// RegisterRequestC2M is the rpc request for a consumer to register request to master.
- RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+ RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error)
// HeartRequestC2M is the rpc request for a consumer to send heartbeat to master.
- HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+ HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
// CloseRequestC2M is the rpc request for a consumer to be closed to master.
- CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+ CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error)
}
// New returns a default TubeMQ rpc Client
@@ -70,8 +71,8 @@ type rpcClient struct {
config *config.Config
}
-func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
- rsp, err := c.client.DoRequest(ctx, req)
+func (c *rpcClient) doRequest(ctx context.Context, address string, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
+ rsp, err := c.client.DoRequest(ctx, address, req)
if err != nil {
return nil, errs.New(errs.RetRequestFailure, err.Error())
}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 9eb336a..c0d4bc3 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -22,11 +22,12 @@ import (
"github.com/golang/protobuf/proto"
- "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
)
const (
@@ -39,7 +40,7 @@ const (
)
// RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
reqC2M := &protocol.RegisterRequestC2M{
ClientId: proto.String(sub.GetClientID()),
HostName: proto.String(metadata.GetNode().GetHost()),
@@ -48,7 +49,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
SessionTime: proto.Int64(sub.GetSubscribedTime()),
DefFlowCheckId: proto.Int64(r.GetDefFlowCtrlID()),
GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()),
- QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+ QryPriorityId: proto.Int32(r.GetQryPriorityID()),
AuthInfo: sub.GetMasterCertificateIInfo(),
}
reqC2M.TopicList = make([]string, 0, len(sub.GetTopics()))
@@ -92,7 +93,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
@@ -106,14 +107,14 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
}
// HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error) {
reqC2M := &protocol.HeartRequestC2M{
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
DefFlowCheckId: proto.Int64(r.GetDefFlowCtrlID()),
GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()),
ReportSubscribeInfo: proto.Bool(false),
- QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+ QryPriorityId: proto.Int32(r.GetQryPriorityID()),
}
event := r.PollEventResult()
if event != nil || metadata.GetReportTimes() {
@@ -159,7 +160,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
Flag: proto.Int32(0),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
@@ -173,7 +174,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
}
// CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error) {
reqC2M := &protocol.CloseRequestC2M{
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -197,7 +198,7 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
Flag: proto.Int32(0),
}
- rspBody, err := c.doRequest(ctx, req)
+ rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
if err != nil {
return nil, err
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go b/tubemq-client-twins/tubemq-client-go/sub/info.go
similarity index 62%
rename from tubemq-client-twins/tubemq-client-go/client/info.go
rename to tubemq-client-twins/tubemq-client-go/sub/info.go
index 25a2686..737b08a 100644
--- a/tubemq-client-twins/tubemq-client-go/client/info.go
+++ b/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-// Package client defines the api and information
-// which can be exposed to user.
-package client
+// package sub defines the subscription information of a client.
+package sub
import (
+ "strconv"
+ "time"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
)
// InvalidOffset represents the offset which is invalid.
-const InValidOffset = -1
+const InValidOffset = -2
// SubInfo represents the sub information of the client.
type SubInfo struct {
@@ -46,6 +49,46 @@ type SubInfo struct {
masterCertificateInfo *protocol.MasterCertificateInfo
}
+// NewSubInfo parses the subscription from the config to SubInfo.
+func NewSubInfo(config *config.Config) *SubInfo {
+ s := &SubInfo{
+ boundConsume: config.Consumer.BoundConsume,
+ subscribedTime: time.Now().UnixNano() / int64(time.Millisecond),
+ firstRegistered: false,
+ topics: config.Consumer.Topics,
+ topicFilters: config.Consumer.TopicFilters,
+ }
+ s.topicConds = make([]string, 0, len(config.Consumer.TopicFilters))
+ for topic, filters := range config.Consumer.TopicFilters {
+ cond := topic + "#"
+ count := 0
+ for _, filter := range filters {
+ if count > 0 {
+ cond += ","
+ }
+ cond += filter
+ }
+ s.topicConds = append(s.topicConds, cond)
+ }
+ if config.Consumer.BoundConsume {
+ s.sessionKey = config.Consumer.SessionKey
+ s.sourceCount = int32(config.Consumer.SourceCount)
+ s.selectBig = config.Consumer.SelectBig
+ assignedPartitions := config.Consumer.PartitionOffset
+ count := 0
+ for partition, offset := range assignedPartitions {
+ if count > 0 {
+ s.boundPartitions += ","
+ }
+ s.boundPartitions += partition
+ s.boundPartitions += "="
+ s.boundPartitions += strconv.Itoa(int(offset))
+ count++
+ }
+ }
+ return s
+}
+
// GetClientID returns the client ID.
func (s *SubInfo) GetClientID() string {
return s.clientID
@@ -124,6 +167,32 @@ func (s *SubInfo) GetAuthorizedInfo() *protocol.AuthorizedInfo {
return s.authInfo
}
+// GetMasterCertifateInfo returns the masterCertificateInfo.
func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo {
return s.masterCertificateInfo
}
+
+// FirstRegistered sets the firstRegistered to true.
+func (s *SubInfo) FirstRegistered() {
+ s.firstRegistered = true
+}
+
+// SetAuthorizedInfo sets the authorizedInfo.
+func (s *SubInfo) SetAuthorizedInfo(auth *protocol.AuthorizedInfo) {
+ s.authInfo = auth
+}
+
+// SetMasterCertificateInfo sets the masterCertificateInfo.
+func (s *SubInfo) SetMasterCertificateInfo(info *protocol.MasterCertificateInfo) {
+ s.masterCertificateInfo = info
+}
+
+// SetIsNotAllocated sets the notAllocated.
+func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) {
+ s.notAllocated = isNotAllocated
+}
+
+// SetClientID sets the clientID.
+func (s *SubInfo) SetClientID(clientID string) {
+ s.clientID = clientID
+}
diff --git a/tubemq-client-twins/tubemq-client-go/transport/client.go b/tubemq-client-twins/tubemq-client-go/transport/client.go
index d816a7f..91c9692 100644
--- a/tubemq-client-twins/tubemq-client-go/transport/client.go
+++ b/tubemq-client-twins/tubemq-client-go/transport/client.go
@@ -28,8 +28,6 @@ import (
// Options represents the transport options
type Options struct {
- Address string
-
CACertFile string
TLSCertFile string
TLSKeyFile string
@@ -51,9 +49,9 @@ func New(opts *Options, pool *multiplexing.Pool) *Client {
}
// DoRequest sends the request and return the decoded response
-func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPCResponse, error) {
+func (c *Client) DoRequest(ctx context.Context, address string, req codec.RPCRequest) (codec.RPCResponse, error) {
opts := &multiplexing.DialOptions{
- Address: c.opts.Address,
+ Address: address,
Network: "tcp",
}
if c.opts.CACertFile != "" {
@@ -63,7 +61,7 @@ func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPC
opts.TLSServerName = c.opts.TLSServerName
}
- conn, err := c.pool.Get(ctx, c.opts.Address, req.GetSerialNo(), opts)
+ conn, err := c.pool.Get(ctx, address, req.GetSerialNo(), opts)
if err != nil {
return nil, err
}
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go
new file mode 100644
index 0000000..77422ac
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -0,0 +1,52 @@
+package util
+
+import (
+ "net"
+)
+
+var InvalidValue = int64(-2)
+
+func GetLocalHost() string {
+ ifaces, err := net.Interfaces()
+ if err != nil {
+ return ""
+ }
+ for _, iface := range ifaces {
+ if iface.Flags&net.FlagUp == 0 {
+ continue // interface down
+ }
+ if iface.Flags&net.FlagLoopback != 0 {
+ continue // loopback interface
+ }
+ addrs, err := iface.Addrs()
+ if err != nil {
+ return ""
+ }
+ for _, addr := range addrs {
+ var ip net.IP
+ switch v := addr.(type) {
+ case *net.IPNet:
+ ip = v.IP
+ case *net.IPAddr:
+ ip = v.IP
+ }
+ if ip == nil || ip.IsLoopback() {
+ continue
+ }
+ ip = ip.To4()
+ if ip == nil {
+ continue // not an ipv4 address
+ }
+ return ip.String()
+ }
+ }
+ return ""
+}
+
+func GenBrokerAuthenticateToken(username string, password string) string {
+ return ""
+}
+
+func GenMasterAuthenticateToken(username string, password string) string {
+ return ""
+}