You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:53 UTC

[incubator-inlong] branch INLONG-25 updated (6fd7644 -> 65cfc8f)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git.


    from 6fd7644  Use errs
     new 0d4ed41  [INLONG-624]Go SDK consumer interface
     new 845de0b  Add license and comments
     new a2af6f0  Implement offerEventResult
     new d5ea125  Add trim and use slice
     new a6fc39d  Add comment.
     new 2d64bb5  Address review comments
     new 67400c7  Add license
     new 07e73ae  init the channel
     new 65cfc8f  [INLONG-624]Go SDK Consumer Start API

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tubemq-client-go/client/consumer.go            |  40 +--
 .../tubemq-client-go/client/consumer_impl.go       | 374 +++++++++++++++++++++
 .../tubemq-client-go/client/heartbeat.go           | 236 +++++++++++++
 .../tubemq-client-go/client/remote.go              |  79 -----
 .../tubemq-client-go/client/version.go             |   7 +-
 .../tubemq-client-go/config/config.go              |  18 +-
 .../tubemq-client-go/config/config_test.go         |  10 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  12 +-
 .../tubemq-client-go/metadata/consumer_event.go    |  15 +
 .../tubemq-client-go/metadata/metadata.go          |  20 ++
 .../tubemq-client-go/metadata/node.go              |  54 +++
 .../tubemq-client-go/metadata/partition.go         |  45 ++-
 .../tubemq-client-go/metadata/subcribe_info.go     |  54 ++-
 .../tubemq-client-go/remote/remote.go              | 346 +++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go |  32 +-
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  23 +-
 tubemq-client-twins/tubemq-client-go/rpc/master.go |  19 +-
 .../tubemq-client-go/{client => sub}/info.go       |  77 ++++-
 .../tubemq-client-go/transport/client.go           |   8 +-
 tubemq-client-twins/tubemq-client-go/util/util.go  |  74 ++++
 20 files changed, 1372 insertions(+), 171 deletions(-)
 copy tubemq-server/src/main/java/org/apache/tubemq/server/common/offsetstorage/zookeeper/Abortable.java => tubemq-client-twins/tubemq-client-go/client/consumer.go (54%)
 create mode 100644 tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
 create mode 100644 tubemq-client-twins/tubemq-client-go/client/heartbeat.go
 delete mode 100644 tubemq-client-twins/tubemq-client-go/client/remote.go
 copy tubemq-core/src/main/java/org/apache/tubemq/corerpc/codec/Protocol.txt => tubemq-client-twins/tubemq-client-go/client/version.go (92%)
 create mode 100644 tubemq-client-twins/tubemq-client-go/remote/remote.go
 rename tubemq-client-twins/tubemq-client-go/{client => sub}/info.go (62%)
 create mode 100644 tubemq-client-twins/tubemq-client-go/util/util.go

[incubator-inlong] 05/09: Add comment.

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a6fc39d6f45edbb451c6b15c26933f770dc3e0af
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jun 8 10:42:16 2021 +0800

    Add comment.
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/metadata/partition.go | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index 3ce1ca5..d01b570 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -32,6 +32,7 @@ type Partition struct {
 	lastConsumed bool
 }
 
+// NewPartition parses a partition from the given string.
 func NewPartition(partition string) (*Partition, error) {
 	var b *Node
 	var topic string
@@ -81,6 +82,7 @@ func (p *Partition) GetTopic() string {
 	return p.topic
 }
 
+// GetBroker returns the broker.
 func (p *Partition) GetBroker() *Node {
 	return p.broker
 }
@@ -90,6 +92,7 @@ func (p *Partition) String() string {
 	return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
 }
 
+// SetLastConsumed sets the last consumed.
 func (p *Partition) SetLastConsumed(lastConsumed bool) {
 	p.lastConsumed = lastConsumed
 }

[incubator-inlong] 01/09: [INLONG-624]Go SDK consumer interface

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 0d4ed41f647eb5155c936f10f0f61113c44161b3
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jun 3 16:02:09 2021 +0800

    [INLONG-624]Go SDK consumer interface
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../{metadata/node.go => client/consumer.go}       |  51 ++-
 .../tubemq-client-go/client/consumer_impl.go       | 359 +++++++++++++++++++++
 .../tubemq-client-go/client/heartbeat.go           | 229 +++++++++++++
 .../tubemq-client-go/client/remote.go              |  79 -----
 .../tubemq-client-go/config/config.go              |  18 +-
 .../tubemq-client-go/config/config_test.go         |  10 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  12 +-
 .../tubemq-client-go/metadata/consumer_event.go    |   8 +
 .../tubemq-client-go/metadata/metadata.go          |  20 ++
 .../tubemq-client-go/metadata/node.go              |  54 ++++
 .../tubemq-client-go/metadata/partition.go         |  42 ++-
 .../tubemq-client-go/metadata/subcribe_info.go     |  54 +++-
 .../tubemq-client-go/remote/remote.go              | 331 +++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go |  32 +-
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  23 +-
 tubemq-client-twins/tubemq-client-go/rpc/master.go |  19 +-
 .../tubemq-client-go/{client => sub}/info.go       |  77 ++++-
 .../tubemq-client-go/transport/client.go           |   8 +-
 tubemq-client-twins/tubemq-client-go/util/util.go  |  52 +++
 19 files changed, 1299 insertions(+), 179 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
similarity index 52%
copy from tubemq-client-twins/tubemq-client-go/metadata/node.go
copy to tubemq-client-twins/tubemq-client-go/client/consumer.go
index 625f278..27a8536 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -15,41 +15,32 @@
  * limitations under the License.
  */
 
-package metadata
+// Package client defines the api and information
+// which can be exposed to user.
+package client
 
-import (
-	"strconv"
+const (
+	tubeMQClientVersion = "0.1.0"
 )
 
-// Node represents the metadata of a node.
-type Node struct {
-	id      uint32
-	host    string
-	port    uint32
-	address string
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
 }
 
-// GetID returns the id of a node.
-func (n *Node) GetID() uint32 {
-	return n.id
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
 }
 
-// GetPort returns the port of a node.
-func (n *Node) GetPort() uint32 {
-	return n.port
-}
-
-// GetHost returns the hostname of a node.
-func (n *Node) GetHost() string {
-	return n.host
-}
-
-// GetAddress returns the address of a node.
-func (n *Node) GetAddress() string {
-	return n.address
-}
-
-// String returns the metadata of a node as a string.
-func (n *Node) String() string {
-	return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port))
+var clientIndex uint64
+
+// Consumer is an interface that abstracts behavior of TubeMQ's consumer
+type Consumer interface {
+	// Start starts the consumer.
+	Start() error
+	// GetMessage receive a single message.
+	GetMessage() (*ConsumerResult, error)
+	// Confirm the consumption of a message.
+	Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
+	// GetCurrConsumedInfo returns the consumptions of the consumer.
+	GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
new file mode 100644
index 0000000..c0f7e9a
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {
+		event := c.rmtDataCache.TakeEvent()
+		if event == nil {
+			continue
+		}
+		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+			break
+		}
+		c.rmtDataCache.ClearEvent()
+		switch event.GetEventType() {
+		case 2, 20:
+			c.disconnect2Broker(event)
+			c.rmtDataCache.OfferEvent(event)
+		case 1, 10:
+			c.connect2Broker(event)
+			c.rmtDataCache.OfferEvent(event)
+		}
+	}
+}
+
+func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
+	subscribeInfo := event.GetSubscribeInfo()
+	if len(subscribeInfo) > 0 {
+		removedPartitions := make(map[*metadata.Node][]*metadata.Partition)
+		c.rmtDataCache.RemoveAndGetPartition(subscribeInfo, c.config.Consumer.RollbackIfConfirmTimeout, removedPartitions)
+		if len(removedPartitions) > 0 {
+			c.unregister2Broker(removedPartitions)
+		}
+	}
+	event.SetEventStatus(2)
+}
+
+func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) {
+	if len(unRegPartitions) == 0 {
+		return
+	}
+
+	for _, partitions := range unRegPartitions {
+		for _, partition := range partitions {
+			ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+			m := &metadata.Metadata{}
+			node := &metadata.Node{}
+			node.SetHost(util.GetLocalHost())
+			node.SetAddress(partition.GetBroker().GetAddress())
+			m.SetNode(node)
+			m.SetReadStatus(1)
+			sub := &metadata.SubscribeInfo{}
+			sub.SetGroup(c.config.Consumer.Group)
+			sub.SetConsumerID(c.clientID)
+			sub.SetPartition(partition)
+			m.SetSubscribeInfo(sub)
+
+			c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+			cancel()
+		}
+	}
+}
+
+func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+	if len(event.GetSubscribeInfo()) > 0 {
+		unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
+		if len(unsubPartitions) > 0 {
+			for _, partition := range unsubPartitions {
+				m := &metadata.Metadata{}
+				node := &metadata.Node{}
+				node.SetHost(util.GetLocalHost())
+				node.SetAddress(partition.GetBroker().GetAddress())
+				m.SetNode(node)
+				sub := &metadata.SubscribeInfo{}
+				sub.SetGroup(c.config.Consumer.Group)
+				sub.SetConsumerID(c.clientID)
+				sub.SetPartition(partition)
+				m.SetSubscribeInfo(sub)
+				isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+				m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+				auth := c.genBrokerAuthenticInfo(true)
+				c.subInfo.SetAuthorizedInfo(auth)
+
+				ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+				rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+				if err != nil {
+					//todo add log
+				}
+				if rsp.GetSuccess() {
+					c.rmtDataCache.AddNewPartition(partition)
+					c.heartbeatManager.registerBroker(node)
+				}
+				cancel()
+			}
+		}
+	}
+	c.subInfo.FirstRegistered()
+	event.SetEventStatus(2)
+}
+
+func newClientIndex() uint64 {
+	return atomic.AddUint64(&clientIndex, 1)
+}
+
+func newClientID(group string) string {
+	return group + "_" +
+		util.GetLocalHost() + "_" +
+		strconv.Itoa(os.Getpid()) + "_" +
+		strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
+		strconv.Itoa(int(newClientIndex())) + "_" +
+		tubeMQClientVersion
+}
+
+func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo {
+	needAdd := false
+	auth := &protocol.AuthorizedInfo{}
+	if c.config.Net.Auth.Enable {
+		if force {
+			needAdd = true
+			atomic.StoreInt32(&c.nextAuth2Broker, 0)
+		} else if atomic.LoadInt32(&c.nextAuth2Broker) == 1 {
+			if atomic.CompareAndSwapInt32(&c.nextAuth2Broker, 1, 0) {
+				needAdd = true
+			}
+		}
+		if needAdd {
+			authToken := util.GenBrokerAuthenticateToken(c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+			auth.AuthAuthorizedToken = proto.String(authToken)
+		}
+	}
+	return auth
+}
+
+func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) {
+	needAdd := false
+	if c.config.Net.Auth.Enable {
+		if force {
+			needAdd = true
+			atomic.StoreInt32(&c.nextAuth2Master, 0)
+		} else if atomic.LoadInt32(&c.nextAuth2Master) == 1 {
+			if atomic.CompareAndSwapInt32(&c.nextAuth2Master, 1, 0) {
+				needAdd = true
+			}
+		}
+		if needAdd {
+		}
+	}
+}
+
+func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
+	readStatus := consumeStatusNormal
+	if isFirstReg {
+		if c.config.Consumer.ConsumePosition == 0 {
+			readStatus = consumeStatusFromMax
+		} else if c.config.Consumer.ConsumePosition > 0 {
+			readStatus = consumeStatusFromMaxAlways
+		}
+	}
+	return int32(readStatus)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
new file mode 100644
index 0000000..9c29854
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type heartbeatMetadata struct {
+	numConnections int
+	timer          *time.Timer
+}
+
+type heartbeatManager struct {
+	consumer   *consumer
+	heartbeats map[string]*heartbeatMetadata
+	mu         sync.Mutex
+}
+
+func newHBManager(consumer *consumer) *heartbeatManager {
+	return &heartbeatManager{
+		consumer:   consumer,
+		heartbeats: make(map[string]*heartbeatMetadata),
+	}
+}
+
+func (h *heartbeatManager) registerMaster(address string) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	if _, ok := h.heartbeats[address]; !ok {
+		h.heartbeats[address] = &heartbeatMetadata{
+			numConnections: 1,
+			timer:          time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master),
+		}
+	}
+	hm := h.heartbeats[address]
+	hm.numConnections++
+}
+
+func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+
+	if _, ok := h.heartbeats[broker.GetAddress()]; !ok {
+		h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{
+			numConnections: 1,
+			timer:          time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() { h.consumerHB2Broker(broker) }),
+		}
+	}
+	hm := h.heartbeats[broker.GetAddress()]
+	hm.numConnections++
+}
+
+func (h *heartbeatManager) consumerHB2Master() {
+	if time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 {
+		h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait)
+	}
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(h.consumer.master.Address)
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(h.consumer.config.Consumer.Group)
+	m.SetSubscribeInfo(sub)
+	h.consumer.unreportedTimes++
+	if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval {
+		m.SetReportTimes(true)
+		h.consumer.unreportedTimes = 0
+	}
+
+	retry := 0
+	for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
+		ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+		rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+		if err != nil {
+			cancel()
+		}
+		if rsp.GetSuccess() {
+			cancel()
+			h.processHBResponseM2C(rsp)
+			break
+		} else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+			cancel()
+			h.consumer.masterHBRetry++
+			address := h.consumer.master.Address
+			go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+			if rsp.GetErrCode() != errs.RetErrHBNoNode {
+				hm := h.heartbeats[address]
+				hm.numConnections--
+				if hm.numConnections == 0 {
+					h.mu.Lock()
+					delete(h.heartbeats, address)
+					h.mu.Unlock()
+				}
+			}
+			return
+		}
+		cancel()
+	}
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	hm := h.heartbeats[h.consumer.master.Address]
+	hm.timer.Reset(h.nextHeartbeatInterval())
+}
+
+func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
+	h.consumer.masterHBRetry = 0
+	h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	if rsp.GetRequireAuth() {
+		atomic.StoreInt32(&h.consumer.nextAuth2Master, 1)
+	}
+	if rsp.GetEvent() != nil {
+		event := rsp.GetEvent()
+		subscribeInfo := make([]*metadata.SubscribeInfo, 0, len(event.GetSubscribeInfo()))
+		for _, sub := range event.GetSubscribeInfo() {
+			s, err := metadata.NewSubscribeInfo(sub)
+			if err != nil {
+				continue
+			}
+			subscribeInfo = append(subscribeInfo, s)
+		}
+		e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo)
+		h.consumer.rmtDataCache.OfferEvent(e)
+	}
+}
+
+func (h *heartbeatManager) nextHeartbeatInterval() time.Duration {
+	interval := h.consumer.config.Heartbeat.Interval
+	if h.consumer.masterHBRetry >= h.consumer.config.Heartbeat.MaxRetryTimes {
+		interval = h.consumer.config.Heartbeat.AfterFail
+	}
+	return interval
+}
+
+func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+
+	partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+	if len(partitions) == 0 {
+		h.resetBrokerTimer(broker)
+		return
+	}
+	m := &metadata.Metadata{}
+	m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+	m.SetNode(broker)
+	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+	defer cancel()
+
+	rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	if err != nil {
+		return
+	}
+	if rsp.GetSuccess() {
+		if rsp.GetHasPartFailure() {
+			partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
+			for _, fi := range rsp.GetFailureInfo() {
+				pos := strings.Index(fi, ":")
+				if pos == -1 {
+					continue
+				}
+				partition, err := metadata.NewPartition(fi[pos+1:])
+				if err != nil {
+					continue
+				}
+				partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+			}
+			h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+		} else {
+			if rsp.GetErrCode() == errs.RetCertificateFailure {
+				partitionKeys := make([]string, 0, len(partitions))
+				for _, partition := range partitions {
+					partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+				}
+				h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+			}
+		}
+	}
+	h.resetBrokerTimer(broker)
+}
+
+func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
+	interval := h.consumer.config.Heartbeat.Interval
+	partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+	if len(partitions) == 0 {
+		delete(h.heartbeats, broker.GetAddress())
+	} else {
+		hm := h.heartbeats[broker.GetAddress()]
+		hm.timer.Reset(interval)
+	}
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go
deleted file mode 100644
index e36b69f..0000000
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package client defines the api and information
-// which can be exposed to user.
-package client
-
-import (
-	"sync"
-
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
-)
-
-// RmtDataCache represents the data returned from TubeMQ.
-type RmtDataCache struct {
-	consumerID         string
-	groupName          string
-	underGroupCtrl     bool
-	defFlowCtrlID      int64
-	groupFlowCtrlID    int64
-	subscribeInfo      []*metadata.SubscribeInfo
-	rebalanceResults   []*metadata.ConsumerEvent
-	mu                 sync.Mutex
-	brokerToPartitions map[*metadata.Node][]*metadata.Partition
-}
-
-// GetUnderGroupCtrl returns the underGroupCtrl.
-func (r *RmtDataCache) GetUnderGroupCtrl() bool {
-	return r.underGroupCtrl
-}
-
-// GetDefFlowCtrlID returns the defFlowCtrlID.
-func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
-	return r.defFlowCtrlID
-}
-
-// GetGroupFlowCtrlID returns the groupFlowCtrlID.
-func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
-	return r.groupFlowCtrlID
-}
-
-// GetSubscribeInfo returns the subscribeInfo.
-func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
-	return r.subscribeInfo
-}
-
-// PollEventResult polls the first event result from the rebalanceResults.
-func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	if len(r.rebalanceResults) > 0 {
-		event := r.rebalanceResults[0]
-		r.rebalanceResults = r.rebalanceResults[1:]
-		return event
-	}
-	return nil
-}
-
-// GetPartitionByBroker returns the
-func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) []*metadata.Partition {
-	if partitions, ok := r.brokerToPartitions[node]; ok {
-		return partitions
-	}
-	return nil
-}
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 47a360a..7a93831 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -29,7 +29,7 @@ import (
 // Config defines multiple configuration options.
 // Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
 type Config struct {
-	// Net is the namespace for network-level properties used by Broker and Master.
+	// Net is the namespace for network-level properties used by broker and Master.
 	Net struct {
 		// ReadTimeout represents how long to wait for a response.
 		ReadTimeout time.Duration
@@ -61,9 +61,13 @@ type Config struct {
 	// used by the consumer
 	Consumer struct {
 		// Masters is the addresses of master.
-		Masters []string
-		// Topic of the consumption.
-		Topic string
+		Masters string
+		// Topics of the consumption.
+		Topics []string
+		// TopicFilters is the map of topic to filters.
+		TopicFilters map[string][]string
+		// PartitionOffset is the map of partition to its corresponding offset.
+		PartitionOffset map[string]int64
 		// ConsumerPosition is the initial offset to use if no offset was previously committed.
 		ConsumePosition int
 		// Group is the consumer group name.
@@ -152,7 +156,7 @@ func ParseAddress(address string) (config *Config, err error) {
 		return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens)
 	}
 
-	c.Consumer.Masters = strings.Split(tokens[0], ",")
+	c.Consumer.Masters = tokens[0]
 
 	tokens = strings.Split(tokens[1], "&")
 	if len(tokens) == 0 {
@@ -190,8 +194,8 @@ func getConfigFromToken(config *Config, values []string) error {
 		config.Net.TLS.TLSServerName = values[1]
 	case "group":
 		config.Consumer.Group = values[1]
-	case "topic":
-		config.Consumer.Topic = values[1]
+	case "topics":
+		config.Consumer.Topics = strings.Split(values[1], ",")
 	case "consumePosition":
 		config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
 	case "boundConsume":
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index 56bc600..a8ac703 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -25,11 +25,11 @@ import (
 )
 
 func TestParseAddress(t *testing.T) {
-	address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+	address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
 	c, err := ParseAddress(address)
 	assert.Nil(t, err)
-	assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
-	assert.Equal(t, c.Consumer.Topic, "Topic")
+	assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093")
+	assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"})
 	assert.Equal(t, c.Consumer.Group, "Group")
 	assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
 
@@ -41,11 +41,11 @@ func TestParseAddress(t *testing.T) {
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 
-	address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
+	address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt"
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 
-	address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
+	address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt=ttt"
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 00b3c81..eb45179 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -33,12 +33,18 @@ const (
 	RetAssertionFailure = 4
 	// RetRequestFailure represents the error code of request error.
 	RetRequestFailure = 5
-	// RetSelectorNotExist = 6
-	RetSelectorNotExist = 6
+	// RetSelectorNotExist represents the selector not exists.
+	RetSelectorNotExist        = 6
+	RetErrHBNoNode             = 411
+	RetCertificateFailure      = 415
+	RetConsumeGroupForbidden   = 450
+	RetConsumeContentForbidden = 455
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
-var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+var (
+	ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+)
 
 // Error provides a TubeMQ-specific error container
 type Error struct {
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 2b4ce49..6ce2915 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -25,6 +25,14 @@ type ConsumerEvent struct {
 	subscribeInfo []*SubscribeInfo
 }
 
+func NewEvent(rebalanceID int64, eventType int32, subscribeInfo []*SubscribeInfo) *ConsumerEvent {
+	return &ConsumerEvent{
+		rebalanceID:   rebalanceID,
+		eventType:     eventType,
+		subscribeInfo: subscribeInfo,
+	}
+}
+
 // GetRebalanceID returns the rebalanceID of a consumer event.
 func (c *ConsumerEvent) GetRebalanceID() int64 {
 	return c.rebalanceID
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
index 861a06e..f64674a 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -45,3 +45,23 @@ func (m *Metadata) GetReadStatus() int32 {
 func (m *Metadata) GetReportTimes() bool {
 	return m.reportTimes
 }
+
+// SetNode sets the node.
+func (m *Metadata) SetNode(node *Node) {
+	m.node = node
+}
+
+// SetSubscribeInfo sets the subscribeInfo.
+func (m *Metadata) SetSubscribeInfo(sub *SubscribeInfo) {
+	m.subscribeInfo = sub
+}
+
+// ReadStatus sets the status.
+func (m *Metadata) SetReadStatus(status int32) {
+	m.readStatus = status
+}
+
+// SetReportTimes sets the reportTimes.
+func (m *Metadata) SetReportTimes(reportTimes bool) {
+	m.reportTimes = reportTimes
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go
index 625f278..bcfda82 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -19,6 +19,7 @@ package metadata
 
 import (
 	"strconv"
+	"strings"
 )
 
 // Node represents the metadata of a node.
@@ -29,6 +30,43 @@ type Node struct {
 	address string
 }
 
+// NewNode constructs a node from a given string.
+// If the given string is invalid, it will return error.
+func NewNode(isBroker bool, node string) (*Node, error) {
+	res := strings.Split(node, ":")
+	nodeID := 0
+	host := ""
+	port := 8123
+	var err error
+	if isBroker {
+		nodeID, err = strconv.Atoi(res[0])
+		if err != nil {
+			return nil, err
+		}
+		host = res[1]
+		if len(res) >= 3 {
+			port, err = strconv.Atoi(res[2])
+			if err != nil {
+				return nil, err
+			}
+		}
+	} else {
+		host = res[0]
+		if len(res) >= 2 {
+			port, err = strconv.Atoi(res[1])
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	return &Node{
+		id:      uint32(nodeID),
+		host:    host,
+		port:    uint32(port),
+		address: host + ":" + strconv.Itoa(port),
+	}, nil
+}
+
 // GetID returns the id of a node.
 func (n *Node) GetID() uint32 {
 	return n.id
@@ -53,3 +91,19 @@ func (n *Node) GetAddress() string {
 func (n *Node) String() string {
 	return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port))
 }
+
+// SetHost sets the host.
+func (n *Node) SetHost(host string) {
+	n.host = host
+}
+
+// SetAddress sets the address.
+func (n *Node) SetAddress(address string) error {
+	port, err := strconv.Atoi(strings.Split(address, ":")[1])
+	if err != nil {
+		return err
+	}
+	n.address = address
+	n.port = uint32(port)
+	return nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index a180214..fbe1086 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -19,18 +19,48 @@ package metadata
 
 import (
 	"strconv"
+	"strings"
 )
 
 // Partition represents the metadata of a partition.
 type Partition struct {
 	topic        string
-	Broker       *Node
+	broker       *Node
 	partitionID  int32
 	partitionKey string
 	offset       int64
 	lastConsumed bool
 }
 
+func NewPartition(partition string) (*Partition, error) {
+	var b *Node
+	var topic string
+	var partitionID int
+	var err error
+	pos := strings.Index(partition, "#")
+	if pos != -1 {
+		broker := partition[:pos]
+		b, err = NewNode(true, broker)
+		if err != nil {
+			return nil, err
+		}
+		p := partition[pos+1:]
+		pos = strings.Index(p, ":")
+		if pos != -1 {
+			topic = p[0:pos]
+			partitionID, err = strconv.Atoi(p[pos+1:])
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	return &Partition{
+		topic:       topic,
+		broker:      b,
+		partitionID: int32(partitionID),
+	}, nil
+}
+
 // GetLastConsumed returns lastConsumed of a partition.
 func (p *Partition) GetLastConsumed() bool {
 	return p.lastConsumed
@@ -51,7 +81,15 @@ func (p *Partition) GetTopic() string {
 	return p.topic
 }
 
+func (p *Partition) GetBroker() *Node {
+	return p.broker
+}
+
 // String returns the metadata of a Partition as a string.
 func (p *Partition) String() string {
-	return p.Broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+	return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+}
+
+func (p *Partition) SetLastConsumed(lastConsumed bool) {
+	p.lastConsumed = lastConsumed
 }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index d76437a..90ae3ef 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -19,14 +19,14 @@ package metadata
 
 import (
 	"fmt"
+	"strings"
 )
 
 // SubscribeInfo represents the metadata of the subscribe info.
 type SubscribeInfo struct {
-	group         string
-	consumerID    string
-	partition     *Partition
-	qryPriorityID int32
+	group      string
+	consumerID string
+	partition  *Partition
 }
 
 // GetGroup returns the group name.
@@ -44,12 +44,48 @@ func (s *SubscribeInfo) GetPartition() *Partition {
 	return s.partition
 }
 
-// GetQryPriorityID returns the QryPriorityID.
-func (s *SubscribeInfo) GetQryPriorityID() int32 {
-	return s.qryPriorityID
-}
-
 // String returns the contents of SubscribeInfo as a string.
 func (s *SubscribeInfo) String() string {
 	return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, s.partition.String())
 }
+
+// NewSubscribeInfo constructs a SubscribeInfo from a given string.
+// If the given is invalid, it will return error.
+func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
+	consumerID := ""
+	group := ""
+	var partition *Partition
+	var err error
+	pos := strings.Index(subscribeInfo, "#")
+	if pos != -1 {
+		consumerInfo := subscribeInfo[:pos]
+		partitionInfo := subscribeInfo[pos+1:]
+		partition, err = NewPartition(partitionInfo)
+		if err != nil {
+			return nil, err
+		}
+		pos = strings.Index(consumerInfo, "@")
+		consumerID = consumerInfo[:pos]
+		group = consumerInfo[pos+1:]
+	}
+	return &SubscribeInfo{
+		group:      group,
+		consumerID: consumerID,
+		partition:  partition,
+	}, nil
+}
+
+// SetPartition sets the partition.
+func (s *SubscribeInfo) SetPartition(partition *Partition) {
+	s.partition = partition
+}
+
+// SetGroup sets the group.
+func (s *SubscribeInfo) SetGroup(group string) {
+	s.group = group
+}
+
+// SetConsumerID sets the consumerID.
+func (s *SubscribeInfo) SetConsumerID(id string) {
+	s.consumerID = id
+}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
new file mode 100644
index 0000000..aa9a2d1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package remote defines the remote data which is returned from TubeMQ.
+package remote
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+// RmtDataCache represents the data returned from TubeMQ.
+type RmtDataCache struct {
+	consumerID         string
+	groupName          string
+	underGroupCtrl     bool
+	defFlowCtrlID      int64
+	groupFlowCtrlID    int64
+	partitionSubInfo   map[string]*metadata.SubscribeInfo
+	rebalanceResults   []*metadata.ConsumerEvent
+	eventMu            sync.Mutex
+	metaMu             sync.Mutex
+	dataBookMu         sync.Mutex
+	brokerPartitions   map[*metadata.Node]map[string]bool
+	qryPriorityID      int32
+	partitions         map[string]*metadata.Partition
+	usedPartitions     map[string]int64
+	indexPartitions    map[string]bool
+	partitionTimeouts  map[string]*time.Timer
+	topicPartitions    map[string]map[string]bool
+	partitionRegBooked map[string]bool
+}
+
+// NewRmtDataCache returns a default rmtDataCache.
+func NewRmtDataCache() *RmtDataCache {
+	return &RmtDataCache{
+		defFlowCtrlID:      util.InvalidValue,
+		groupFlowCtrlID:    util.InvalidValue,
+		qryPriorityID:      int32(util.InvalidValue),
+		partitionSubInfo:   make(map[string]*metadata.SubscribeInfo),
+		rebalanceResults:   make([]*metadata.ConsumerEvent, 0, 0),
+		brokerPartitions:   make(map[*metadata.Node]map[string]bool),
+		partitions:         make(map[string]*metadata.Partition),
+		usedPartitions:     make(map[string]int64),
+		indexPartitions:    make(map[string]bool),
+		partitionTimeouts:  make(map[string]*time.Timer),
+		topicPartitions:    make(map[string]map[string]bool),
+		partitionRegBooked: make(map[string]bool),
+	}
+}
+
+// GetUnderGroupCtrl returns the underGroupCtrl.
+func (r *RmtDataCache) GetUnderGroupCtrl() bool {
+	return r.underGroupCtrl
+}
+
+// GetDefFlowCtrlID returns the defFlowCtrlID.
+func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
+	return r.defFlowCtrlID
+}
+
+// GetGroupFlowCtrlID returns the groupFlowCtrlID.
+func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
+	return r.groupFlowCtrlID
+}
+
+// GetGroupName returns the group name.
+func (r *RmtDataCache) GetGroupName() string {
+	return r.groupName
+}
+
+// GetSubscribeInfo returns the partitionSubInfo.
+func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	subInfos := make([]*metadata.SubscribeInfo, 0, len(r.partitionSubInfo))
+	for _, sub := range r.partitionSubInfo {
+		subInfos = append(subInfos, sub)
+	}
+	return subInfos
+}
+
+// GetQryPriorityID returns the QryPriorityID.
+func (r *RmtDataCache) GetQryPriorityID() int32 {
+	return r.qryPriorityID
+}
+
+// PollEventResult polls the first event result from the rebalanceResults.
+func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	if len(r.rebalanceResults) > 0 {
+		event := r.rebalanceResults[0]
+		r.rebalanceResults = r.rebalanceResults[1:]
+		return event
+	}
+	return nil
+}
+
+// GetPartitionByBroker returns the subscribed partitions of the given broker.
+func (r *RmtDataCache) GetPartitionByBroker(broker *metadata.Node) []*metadata.Partition {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+
+	if partitionMap, ok := r.brokerPartitions[broker]; ok {
+		partitions := make([]*metadata.Partition, 0, len(partitionMap))
+		for partition := range partitionMap {
+			partitions = append(partitions, r.partitions[partition])
+		}
+		return partitions
+	}
+	return nil
+}
+
+// SetConsumerInfo sets the consumer information including consumerID and groupName.
+func (r *RmtDataCache) SetConsumerInfo(consumerID string, group string) {
+	r.consumerID = consumerID
+	r.groupName = group
+}
+
+// UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo.
+func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo string) {
+
+}
+
+// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
+func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID int64, flowCtrlInfo string) {
+
+}
+
+// OfferEvent offers an consumer event.
+func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
+// TakeEvent takes an event from the rebalanceResults.
+func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	if len(r.rebalanceResults) == 0 {
+		return nil
+	}
+	event := r.rebalanceResults[0]
+	r.rebalanceResults = r.rebalanceResults[1:]
+	return event
+}
+
+// ClearEvent clears all the events.
+func (r *RmtDataCache) ClearEvent() {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	r.rebalanceResults = r.rebalanceResults[:0]
+}
+
+// RemoveAndGetPartition removes the given partitions.
+func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) {
+	if len(subscribeInfos) == 0 {
+		return
+	}
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	for _, sub := range subscribeInfos {
+		partitionKey := sub.GetPartition().GetPartitionKey()
+		if partition, ok := r.partitions[partitionKey]; ok {
+			if _, ok := r.usedPartitions[partitionKey]; ok {
+				if processingRollback {
+					partition.SetLastConsumed(false)
+				} else {
+					partition.SetLastConsumed(true)
+				}
+			}
+			if _, ok := partitions[partition.GetBroker()]; !ok {
+				partitions[partition.GetBroker()] = []*metadata.Partition{partition}
+			} else {
+				partitions[partition.GetBroker()] = append(partitions[partition.GetBroker()], partition)
+			}
+			r.removeMetaInfo(partitionKey)
+		}
+		r.resetIdlePartition(partitionKey, false)
+	}
+}
+
+func (r *RmtDataCache) removeMetaInfo(partitionKey string) {
+	if partition, ok := r.partitions[partitionKey]; ok {
+		if partitions, ok := r.topicPartitions[partition.GetTopic()]; ok {
+			delete(partitions, partitionKey)
+			if len(partitions) == 0 {
+				delete(r.topicPartitions, partition.GetTopic())
+			}
+		}
+		if partitions, ok := r.brokerPartitions[partition.GetBroker()]; ok {
+			delete(partitions, partition.GetPartitionKey())
+			if len(partitions) == 0 {
+				delete(r.brokerPartitions, partition.GetBroker())
+			}
+		}
+		delete(r.partitions, partitionKey)
+		delete(r.partitionSubInfo, partitionKey)
+	}
+}
+
+func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
+	delete(r.usedPartitions, partitionKey)
+	if timer, ok := r.partitionTimeouts[partitionKey]; ok {
+		if !timer.Stop() {
+			<-timer.C
+		}
+		timer.Stop()
+		delete(r.partitionTimeouts, partitionKey)
+	}
+	delete(r.indexPartitions, partitionKey)
+	if reuse {
+		if _, ok := r.partitions[partitionKey]; ok {
+			r.indexPartitions[partitionKey] = true
+		}
+	}
+}
+
+// FilterPartitions returns the unsubscribed partitions.
+func (r *RmtDataCache) FilterPartitions(subInfos []*metadata.SubscribeInfo) []*metadata.Partition {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	unsubPartitions := make([]*metadata.Partition, 0, len(subInfos))
+	if len(r.partitions) == 0 {
+		for _, sub := range subInfos {
+			unsubPartitions = append(unsubPartitions, sub.GetPartition())
+		}
+	} else {
+		for _, sub := range subInfos {
+			if _, ok := r.partitions[sub.GetPartition().GetPartitionKey()]; !ok {
+				unsubPartitions = append(unsubPartitions, sub.GetPartition())
+			}
+		}
+	}
+	return unsubPartitions
+}
+
+// AddNewPartition append a new partition.
+func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) {
+	sub := &metadata.SubscribeInfo{}
+	sub.SetPartition(newPartition)
+	sub.SetConsumerID(r.consumerID)
+	sub.SetGroup(r.groupName)
+
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	partitionKey := newPartition.GetPartitionKey()
+	if partition, ok := r.partitions[partitionKey]; !ok {
+		r.partitions[partitionKey] = partition
+		if partitions, ok := r.topicPartitions[partition.GetPartitionKey()]; !ok {
+			newPartitions := make(map[string]bool)
+			newPartitions[partitionKey] = true
+			r.topicPartitions[partition.GetTopic()] = newPartitions
+		} else if _, ok := partitions[partitionKey]; !ok {
+			partitions[partitionKey] = true
+		}
+		if partitions, ok := r.brokerPartitions[partition.GetBroker()]; !ok {
+			newPartitions := make(map[string]bool)
+			newPartitions[partitionKey] = true
+			r.brokerPartitions[partition.GetBroker()] = newPartitions
+		} else if _, ok := partitions[partitionKey]; !ok {
+			partitions[partitionKey] = true
+		}
+		r.partitionSubInfo[partitionKey] = sub
+	}
+	r.resetIdlePartition(partitionKey, true)
+}
+
+// HandleExpiredPartitions handles the expired partitions.
+func (r *RmtDataCache) HandleExpiredPartitions(wait time.Duration) {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	expired := make(map[string]bool, len(r.usedPartitions))
+	if len(r.usedPartitions) > 0 {
+		curr := time.Now().UnixNano() / int64(time.Millisecond)
+		for partition, time := range r.usedPartitions {
+			if curr-time > wait.Milliseconds() {
+				expired[partition] = true
+				if p, ok := r.partitions[partition]; ok {
+					p.SetLastConsumed(false)
+				}
+			}
+		}
+		if len(expired) > 0 {
+			for partition := range expired {
+				r.resetIdlePartition(partition, true)
+			}
+		}
+	}
+}
+
+// RemovePartition removes the given partition keys.
+func (r *RmtDataCache) RemovePartition(partitionKeys []string) {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+
+	for _, partitionKey := range partitionKeys {
+		r.resetIdlePartition(partitionKey, false)
+		r.removeMetaInfo(partitionKey)
+	}
+}
+
+// IsFirstRegister returns whether the given partition is first registered.
+func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
+	r.dataBookMu.Lock()
+	defer r.dataBookMu.Unlock()
+
+	if _, ok := r.partitionRegBooked[partitionKey]; !ok {
+		r.partitionRegBooked[partitionKey] = true
+	}
+	return r.partitionRegBooked[partitionKey]
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index 7f9ead2..5105fd5 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -22,11 +22,13 @@ import (
 
 	"github.com/golang/protobuf/proto"
 
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
 const (
@@ -47,14 +49,14 @@ const (
 )
 
 // RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error) {
 	reqC2B := &protocol.RegisterRequestC2B{
 		OpType:        proto.Int32(register),
 		ClientId:      proto.String(sub.GetClientID()),
 		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		TopicName:     proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
 		PartitionId:   proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
-		QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId: proto.Int32(r.GetQryPriorityID()),
 		ReadStatus:    proto.Int32(metadata.GetReadStatus()),
 		AuthInfo:      sub.GetAuthorizedInfo(),
 	}
@@ -66,7 +68,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 		}
 	}
 	offset := sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey())
-	if offset != client.InValidOffset {
+	if offset != util.InvalidValue {
 		reqC2B.CurrOffset = proto.Int64(offset)
 	}
 	data, err := proto.Marshal(reqC2B)
@@ -87,7 +89,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -101,7 +103,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 }
 
 // UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error) {
 	reqC2B := &protocol.RegisterRequestC2B{
 		OpType:      proto.Int32(unregister),
 		ClientId:    proto.String(sub.GetClientID()),
@@ -129,7 +131,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -143,7 +145,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.
 }
 
 // GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
 	reqC2B := &protocol.GetMessageRequestC2B{
 		ClientId:           proto.String(sub.GetClientID()),
 		PartitionId:        proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -171,7 +173,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -185,7 +187,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
 }
 
 // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
 	reqC2B := &protocol.CommitOffsetRequestC2B{
 		ClientId:         proto.String(sub.GetClientID()),
 		TopicName:        proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -206,12 +208,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(brokerConsumerHeartbeat),
+		Method:  proto.Int32(brokerConsumerCommit),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -225,12 +227,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
 }
 
 // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
 	reqC2B := &protocol.HeartBeatRequestC2B{
 		ClientId:      proto.String(sub.GetClientID()),
 		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		ReadStatus:    proto.Int32(metadata.GetReadStatus()),
-		QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId: proto.Int32(r.GetQryPriorityID()),
 		AuthInfo:      sub.GetAuthorizedInfo(),
 	}
 	partitions := r.GetPartitionByBroker(metadata.GetNode())
@@ -256,7 +258,7 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
 		Flag: proto.Int32(0),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 71123c7..fab5bae 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -21,13 +21,14 @@ package rpc
 import (
 	"context"
 
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
 )
 
@@ -39,21 +40,21 @@ const (
 // RPCClient is the rpc level client to interact with TubeMQ.
 type RPCClient interface {
 	// RegisterRequestC2B is the rpc request for a consumer to register to a broker.
-	RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error)
 	// UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker.
-	UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error)
 	// GetMessageRequestC2B is the rpc request for a consumer to get message from a broker.
-	GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+	GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
 	// CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker.
-	CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+	CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
 	// HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker.
-	HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+	HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
 	// RegisterRequestC2M is the rpc request for a consumer to register request to master.
-	RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+	RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error)
 	// HeartRequestC2M is the rpc request for a consumer to send heartbeat to master.
-	HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+	HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
 	// CloseRequestC2M is the rpc request for a consumer to be closed to master.
-	CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+	CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error)
 }
 
 // New returns a default TubeMQ rpc Client
@@ -70,8 +71,8 @@ type rpcClient struct {
 	config *config.Config
 }
 
-func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
-	rsp, err := c.client.DoRequest(ctx, req)
+func (c *rpcClient) doRequest(ctx context.Context, address string, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
+	rsp, err := c.client.DoRequest(ctx, address, req)
 	if err != nil {
 		return nil, errs.New(errs.RetRequestFailure, err.Error())
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 9eb336a..c0d4bc3 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -22,11 +22,12 @@ import (
 
 	"github.com/golang/protobuf/proto"
 
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
 )
 
 const (
@@ -39,7 +40,7 @@ const (
 )
 
 // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
 	reqC2M := &protocol.RegisterRequestC2M{
 		ClientId:         proto.String(sub.GetClientID()),
 		HostName:         proto.String(metadata.GetNode().GetHost()),
@@ -48,7 +49,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 		SessionTime:      proto.Int64(sub.GetSubscribedTime()),
 		DefFlowCheckId:   proto.Int64(r.GetDefFlowCtrlID()),
 		GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()),
-		QryPriorityId:    proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId:    proto.Int32(r.GetQryPriorityID()),
 		AuthInfo:         sub.GetMasterCertificateIInfo(),
 	}
 	reqC2M.TopicList = make([]string, 0, len(sub.GetTopics()))
@@ -92,7 +93,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -106,14 +107,14 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 }
 
 // HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error) {
 	reqC2M := &protocol.HeartRequestC2M{
 		ClientId:            proto.String(sub.GetClientID()),
 		GroupName:           proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		DefFlowCheckId:      proto.Int64(r.GetDefFlowCtrlID()),
 		GroupFlowCheckId:    proto.Int64(r.GetGroupFlowCtrlID()),
 		ReportSubscribeInfo: proto.Bool(false),
-		QryPriorityId:       proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId:       proto.Int32(r.GetQryPriorityID()),
 	}
 	event := r.PollEventResult()
 	if event != nil || metadata.GetReportTimes() {
@@ -159,7 +160,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
 		Flag: proto.Int32(0),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -173,7 +174,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
 }
 
 // CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error) {
 	reqC2M := &protocol.CloseRequestC2M{
 		ClientId:  proto.String(sub.GetClientID()),
 		GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -197,7 +198,7 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
 		Flag: proto.Int32(0),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go b/tubemq-client-twins/tubemq-client-go/sub/info.go
similarity index 62%
rename from tubemq-client-twins/tubemq-client-go/client/info.go
rename to tubemq-client-twins/tubemq-client-go/sub/info.go
index 25a2686..737b08a 100644
--- a/tubemq-client-twins/tubemq-client-go/client/info.go
+++ b/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-// Package client defines the api and information
-// which can be exposed to user.
-package client
+// package sub defines the subscription information of a client.
+package sub
 
 import (
+	"strconv"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
 )
 
 // InvalidOffset represents the offset which is invalid.
-const InValidOffset = -1
+const InValidOffset = -2
 
 // SubInfo represents the sub information of the client.
 type SubInfo struct {
@@ -46,6 +49,46 @@ type SubInfo struct {
 	masterCertificateInfo *protocol.MasterCertificateInfo
 }
 
+// NewSubInfo parses the subscription from the config to SubInfo.
+func NewSubInfo(config *config.Config) *SubInfo {
+	s := &SubInfo{
+		boundConsume:    config.Consumer.BoundConsume,
+		subscribedTime:  time.Now().UnixNano() / int64(time.Millisecond),
+		firstRegistered: false,
+		topics:          config.Consumer.Topics,
+		topicFilters:    config.Consumer.TopicFilters,
+	}
+	s.topicConds = make([]string, 0, len(config.Consumer.TopicFilters))
+	for topic, filters := range config.Consumer.TopicFilters {
+		cond := topic + "#"
+		count := 0
+		for _, filter := range filters {
+			if count > 0 {
+				cond += ","
+			}
+			cond += filter
+		}
+		s.topicConds = append(s.topicConds, cond)
+	}
+	if config.Consumer.BoundConsume {
+		s.sessionKey = config.Consumer.SessionKey
+		s.sourceCount = int32(config.Consumer.SourceCount)
+		s.selectBig = config.Consumer.SelectBig
+		assignedPartitions := config.Consumer.PartitionOffset
+		count := 0
+		for partition, offset := range assignedPartitions {
+			if count > 0 {
+				s.boundPartitions += ","
+			}
+			s.boundPartitions += partition
+			s.boundPartitions += "="
+			s.boundPartitions += strconv.Itoa(int(offset))
+			count++
+		}
+	}
+	return s
+}
+
 // GetClientID returns the client ID.
 func (s *SubInfo) GetClientID() string {
 	return s.clientID
@@ -124,6 +167,32 @@ func (s *SubInfo) GetAuthorizedInfo() *protocol.AuthorizedInfo {
 	return s.authInfo
 }
 
+// GetMasterCertifateInfo returns the masterCertificateInfo.
 func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo {
 	return s.masterCertificateInfo
 }
+
+// FirstRegistered sets the firstRegistered to true.
+func (s *SubInfo) FirstRegistered() {
+	s.firstRegistered = true
+}
+
+// SetAuthorizedInfo sets the authorizedInfo.
+func (s *SubInfo) SetAuthorizedInfo(auth *protocol.AuthorizedInfo) {
+	s.authInfo = auth
+}
+
+// SetMasterCertificateInfo sets the masterCertificateInfo.
+func (s *SubInfo) SetMasterCertificateInfo(info *protocol.MasterCertificateInfo) {
+	s.masterCertificateInfo = info
+}
+
+// SetIsNotAllocated sets the notAllocated.
+func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) {
+	s.notAllocated = isNotAllocated
+}
+
+// SetClientID sets the clientID.
+func (s *SubInfo) SetClientID(clientID string) {
+	s.clientID = clientID
+}
diff --git a/tubemq-client-twins/tubemq-client-go/transport/client.go b/tubemq-client-twins/tubemq-client-go/transport/client.go
index d816a7f..91c9692 100644
--- a/tubemq-client-twins/tubemq-client-go/transport/client.go
+++ b/tubemq-client-twins/tubemq-client-go/transport/client.go
@@ -28,8 +28,6 @@ import (
 
 // Options represents the transport options
 type Options struct {
-	Address string
-
 	CACertFile    string
 	TLSCertFile   string
 	TLSKeyFile    string
@@ -51,9 +49,9 @@ func New(opts *Options, pool *multiplexing.Pool) *Client {
 }
 
 // DoRequest sends the request and return the decoded response
-func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPCResponse, error) {
+func (c *Client) DoRequest(ctx context.Context, address string, req codec.RPCRequest) (codec.RPCResponse, error) {
 	opts := &multiplexing.DialOptions{
-		Address: c.opts.Address,
+		Address: address,
 		Network: "tcp",
 	}
 	if c.opts.CACertFile != "" {
@@ -63,7 +61,7 @@ func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPC
 		opts.TLSServerName = c.opts.TLSServerName
 	}
 
-	conn, err := c.pool.Get(ctx, c.opts.Address, req.GetSerialNo(), opts)
+	conn, err := c.pool.Get(ctx, address, req.GetSerialNo(), opts)
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go
new file mode 100644
index 0000000..77422ac
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -0,0 +1,52 @@
+package util
+
+import (
+	"net"
+)
+
+var InvalidValue = int64(-2)
+
+func GetLocalHost() string {
+	ifaces, err := net.Interfaces()
+	if err != nil {
+		return ""
+	}
+	for _, iface := range ifaces {
+		if iface.Flags&net.FlagUp == 0 {
+			continue // interface down
+		}
+		if iface.Flags&net.FlagLoopback != 0 {
+			continue // loopback interface
+		}
+		addrs, err := iface.Addrs()
+		if err != nil {
+			return ""
+		}
+		for _, addr := range addrs {
+			var ip net.IP
+			switch v := addr.(type) {
+			case *net.IPNet:
+				ip = v.IP
+			case *net.IPAddr:
+				ip = v.IP
+			}
+			if ip == nil || ip.IsLoopback() {
+				continue
+			}
+			ip = ip.To4()
+			if ip == nil {
+				continue // not an ipv4 address
+			}
+			return ip.String()
+		}
+	}
+	return ""
+}
+
+func GenBrokerAuthenticateToken(username string, password string) string {
+	return ""
+}
+
+func GenMasterAuthenticateToken(username string, password string) string {
+	return ""
+}

[incubator-inlong] 06/09: Address review comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 2d64bb530f302ab97bbc2930197909b10e8152df
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jun 8 17:40:17 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go            |   6 +-
 .../tubemq-client-go/client/consumer_impl.go       | 159 ++++++++++++---------
 .../tubemq-client-go/client/heartbeat.go           |  35 +++--
 .../tubemq-client-go/client/version.go             |   5 +
 .../tubemq-client-go/metadata/consumer_event.go    |   7 +
 .../tubemq-client-go/remote/remote.go              |  23 +--
 6 files changed, 130 insertions(+), 105 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index 27a8536..a74dfa5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -19,10 +19,6 @@
 // which can be exposed to user.
 package client
 
-const (
-	tubeMQClientVersion = "0.1.0"
-)
-
 // ConsumerResult of a consumption.
 type ConsumerResult struct {
 }
@@ -31,7 +27,7 @@ type ConsumerResult struct {
 type ConsumerOffset struct {
 }
 
-var clientIndex uint64
+var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
 type Consumer interface {
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index b7312bb..674f810 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -61,6 +61,7 @@ type consumer struct {
 	masterHBRetry    int
 	heartbeatManager *heartbeatManager
 	unreportedTimes  int
+	done             chan struct{}
 }
 
 // NewConsumer returns a consumer which is constructed by a given config.
@@ -70,7 +71,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 		return nil, err
 	}
 
-	clientID := newClientID(config.Consumer.Group)
+	clientID := newClient(config.Consumer.Group)
 	pool := multiplexing.NewPool()
 	opts := &transport.Options{}
 	if config.Net.TLS.Enable {
@@ -118,40 +119,18 @@ func (c *consumer) register2Master(needChange bool) error {
 		c.master = node
 	}
 	for c.master.HasNext {
-		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
-		m := &metadata.Metadata{}
-		node := &metadata.Node{}
-		node.SetHost(util.GetLocalHost())
-		node.SetAddress(c.master.Address)
-		m.SetNode(node)
-		sub := &metadata.SubscribeInfo{}
-		sub.SetGroup(c.config.Consumer.Group)
-		m.SetSubscribeInfo(sub)
-
-		auth := &protocol.AuthenticateInfo{}
-		c.genMasterAuthenticateToken(auth, true)
-		mci := &protocol.MasterCertificateInfo{
-			AuthInfo: auth,
-		}
-		c.subInfo.SetMasterCertificateInfo(mci)
-
-		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		rsp, err := c.sendRegRequest2Master()
 		if err != nil {
-			cancel()
 			return err
 		}
 		if rsp.GetSuccess() {
 			c.masterHBRetry = 0
 			c.processRegisterResponseM2C(rsp)
-			cancel()
 			return nil
 		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
-			cancel()
 			return nil
 		} else {
 			c.master, err = c.selector.Select(c.config.Consumer.Masters)
-			cancel()
 			if err != nil {
 				return err
 			}
@@ -160,6 +139,30 @@ func (c *consumer) register2Master(needChange bool) error {
 	return nil
 }
 
+func (c *consumer) sendRegRequest2Master() (*protocol.RegisterResponseM2C, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(c.master.Address)
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	m.SetSubscribeInfo(sub)
+
+	auth := &protocol.AuthenticateInfo{}
+	c.genMasterAuthenticateToken(auth, true)
+	mci := &protocol.MasterCertificateInfo{
+		AuthInfo: auth,
+	}
+	c.subInfo.SetMasterCertificateInfo(mci)
+
+	rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+	return rsp, err
+}
+
 func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
 	if rsp.GetNotAllocated() {
 		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -202,19 +205,25 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
 
 func (c *consumer) processRebalanceEvent() {
 	for {
-		event := c.rmtDataCache.TakeEvent()
-		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+		select {
+		case event, ok := <-c.rmtDataCache.EventCh:
+			if ok {
+				if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+					break
+				}
+				c.rmtDataCache.ClearEvent()
+				switch event.GetEventType() {
+				case metadata.Disconnect, metadata.OnlyDisconnect:
+					c.disconnect2Broker(event)
+					c.rmtDataCache.OfferEventResult(event)
+				case metadata.Connect, metadata.OnlyConnect:
+					c.connect2Broker(event)
+					c.rmtDataCache.OfferEventResult(event)
+				}
+			}
+		case <-c.done:
 			break
 		}
-		c.rmtDataCache.ClearEvent()
-		switch event.GetEventType() {
-		case 2, 20:
-			c.disconnect2Broker(event)
-			c.rmtDataCache.OfferEventResult(event)
-		case 1, 10:
-			c.connect2Broker(event)
-			c.rmtDataCache.OfferEventResult(event)
-		}
 	}
 }
 
@@ -237,48 +246,41 @@ func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metad
 
 	for _, partitions := range unRegPartitions {
 		for _, partition := range partitions {
-			ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-
-			m := &metadata.Metadata{}
-			node := &metadata.Node{}
-			node.SetHost(util.GetLocalHost())
-			node.SetAddress(partition.GetBroker().GetAddress())
-			m.SetNode(node)
-			m.SetReadStatus(1)
-			sub := &metadata.SubscribeInfo{}
-			sub.SetGroup(c.config.Consumer.Group)
-			sub.SetConsumerID(c.clientID)
-			sub.SetPartition(partition)
-			m.SetSubscribeInfo(sub)
-
-			c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
-			cancel()
+			c.sendUnregisterReq2Broker(partition)
 		}
 	}
 }
 
+func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(partition.GetBroker().GetAddress())
+	m.SetNode(node)
+	m.SetReadStatus(1)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	sub.SetConsumerID(c.clientID)
+	sub.SetPartition(partition)
+	m.SetSubscribeInfo(sub)
+
+	c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+}
+
 func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 	if len(event.GetSubscribeInfo()) > 0 {
 		unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
 		if len(unsubPartitions) > 0 {
 			for _, partition := range unsubPartitions {
-				m := &metadata.Metadata{}
+
 				node := &metadata.Node{}
 				node.SetHost(util.GetLocalHost())
 				node.SetAddress(partition.GetBroker().GetAddress())
-				m.SetNode(node)
-				sub := &metadata.SubscribeInfo{}
-				sub.SetGroup(c.config.Consumer.Group)
-				sub.SetConsumerID(c.clientID)
-				sub.SetPartition(partition)
-				m.SetSubscribeInfo(sub)
-				isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
-				m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
-				auth := c.genBrokerAuthenticInfo(true)
-				c.subInfo.SetAuthorizedInfo(auth)
-
-				ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-				rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+
+				rsp, err := c.sendRegisterReq2Broker(partition, node)
 				if err != nil {
 					//todo add log
 				}
@@ -286,24 +288,39 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 					c.rmtDataCache.AddNewPartition(partition)
 					c.heartbeatManager.registerBroker(node)
 				}
-				cancel()
 			}
 		}
 	}
 	c.subInfo.FirstRegistered()
-	event.SetEventStatus(2)
+	event.SetEventStatus(metadata.Disconnect)
 }
 
-func newClientIndex() uint64 {
-	return atomic.AddUint64(&clientIndex, 1)
+func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+	defer cancel()
+
+	m := &metadata.Metadata{}
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(c.config.Consumer.Group)
+	sub.SetConsumerID(c.clientID)
+	sub.SetPartition(partition)
+	m.SetSubscribeInfo(sub)
+	isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+	m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+	auth := c.genBrokerAuthenticInfo(true)
+	c.subInfo.SetAuthorizedInfo(auth)
+
+	rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+	return rsp, err
 }
 
-func newClientID(group string) string {
+func newClient(group string) string {
 	return group + "_" +
 		util.GetLocalHost() + "_" +
 		strconv.Itoa(os.Getpid()) + "_" +
 		strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
-		strconv.Itoa(int(newClientIndex())) + "_" +
+		strconv.Itoa(int(atomic.AddUint64(&clientID, 1))) + "_" +
 		tubeMQClientVersion
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 9c29854..5ec0b5c 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -95,17 +95,14 @@ func (h *heartbeatManager) consumerHB2Master() {
 
 	retry := 0
 	for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
-		ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
-		rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+		rsp, err := h.sendHeartbeatC2M(m)
 		if err != nil {
-			cancel()
+			continue
 		}
 		if rsp.GetSuccess() {
-			cancel()
 			h.processHBResponseM2C(rsp)
 			break
 		} else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
-			cancel()
 			h.consumer.masterHBRetry++
 			address := h.consumer.master.Address
 			go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
@@ -120,7 +117,6 @@ func (h *heartbeatManager) consumerHB2Master() {
 			}
 			return
 		}
-		cancel()
 	}
 	h.mu.Lock()
 	defer h.mu.Unlock()
@@ -128,6 +124,13 @@ func (h *heartbeatManager) consumerHB2Master() {
 	hm.timer.Reset(h.nextHeartbeatInterval())
 }
 
+func (h *heartbeatManager) sendHeartbeatC2M(m *metadata.Metadata) (*protocol.HeartResponseM2C, error) {
+	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	return rsp, err
+}
+
 func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
 	h.consumer.masterHBRetry = 0
 	h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
@@ -158,7 +161,7 @@ func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C)
 			subscribeInfo = append(subscribeInfo, s)
 		}
 		e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo)
-		h.consumer.rmtDataCache.OfferEvent(e)
+		h.consumer.rmtDataCache.OfferEventAndNotify(e)
 	}
 }
 
@@ -179,16 +182,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 		h.resetBrokerTimer(broker)
 		return
 	}
-	m := &metadata.Metadata{}
-	m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
-	m.SetNode(broker)
-	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
-	defer cancel()
 
-	rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	rsp, err := h.sendHeartbeatC2B(broker)
 	if err != nil {
 		return
 	}
+
 	if rsp.GetSuccess() {
 		if rsp.GetHasPartFailure() {
 			partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
@@ -217,6 +216,16 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 	h.resetBrokerTimer(broker)
 }
 
+func (h *heartbeatManager) sendHeartbeatC2B(broker *metadata.Node) (*protocol.HeartBeatResponseB2C, error) {
+	m := &metadata.Metadata{}
+	m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+	m.SetNode(broker)
+	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+	defer cancel()
+	rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	return rsp, err
+}
+
 func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
 	interval := h.consumer.config.Heartbeat.Interval
 	partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go
new file mode 100644
index 0000000..1828fd7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -0,0 +1,5 @@
+package client
+
+const (
+	tubeMQClientVersion = "0.1.0"
+)
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 6ce2915..08295c5 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -17,6 +17,13 @@
 
 package metadata
 
+const (
+	Disconnect     = 2
+	OnlyDisconnect = 20
+	Connect        = 1
+	OnlyConnect    = 10
+)
+
 // ConsumerEvent represents the metadata of a consumer event
 type ConsumerEvent struct {
 	rebalanceID   int64
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 355e09b..bfc63b7 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -39,7 +39,6 @@ type RmtDataCache struct {
 	eventReadMu        sync.Mutex
 	metaMu             sync.Mutex
 	dataBookMu         sync.Mutex
-	eventReadCond      *sync.Cond
 	brokerPartitions   map[*metadata.Node]map[string]bool
 	qryPriorityID      int32
 	partitions         map[string]*metadata.Partition
@@ -48,6 +47,8 @@ type RmtDataCache struct {
 	partitionTimeouts  map[string]*time.Timer
 	topicPartitions    map[string]map[string]bool
 	partitionRegBooked map[string]bool
+	// EventCh is the channel for consumer to consume
+	EventCh chan *metadata.ConsumerEvent
 }
 
 // NewRmtDataCache returns a default rmtDataCache.
@@ -65,8 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
 		partitionTimeouts:  make(map[string]*time.Timer),
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
+		EventCh:            make(chan *metadata.ConsumerEvent, 1),
 	}
-	r.eventReadCond = sync.NewCond(&r.eventReadMu)
 	return r
 }
 
@@ -149,24 +150,14 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
 
 }
 
-// OfferEvent offers an consumer event and notifies the consumer method.
-func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+// OfferEventAndNotify offers an consumer event and notifies the consumer method and notify the consumer to consume.
+func (r *RmtDataCache) OfferEventAndNotify(event *metadata.ConsumerEvent) {
 	r.eventReadMu.Lock()
 	defer r.eventReadMu.Unlock()
 	r.rebalanceResults = append(r.rebalanceResults, event)
-	r.eventReadCond.Broadcast()
-}
-
-// TakeEvent takes an event from the rebalanceResults.
-func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
-	r.eventReadMu.Lock()
-	defer r.eventReadMu.Unlock()
-	for len(r.rebalanceResults) == 0 {
-		r.eventReadCond.Wait()
-	}
-	event := r.rebalanceResults[0]
+	e := r.rebalanceResults[0]
 	r.rebalanceResults = r.rebalanceResults[1:]
-	return event
+	r.EventCh <- e
 }
 
 // ClearEvent clears all the events.

[incubator-inlong] 04/09: Add trim and use slice

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit d5ea12531f85625bf7412ab417e5e2e536b055e1
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jun 4 18:42:04 2021 +0800

    Add trim and use slice
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/metadata/partition.go            |  8 ++++----
 .../tubemq-client-go/metadata/subcribe_info.go        |  8 ++++----
 tubemq-client-twins/tubemq-client-go/remote/remote.go | 19 +++++++++++++++----
 3 files changed, 23 insertions(+), 12 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index fbe1086..3ce1ca5 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -39,16 +39,16 @@ func NewPartition(partition string) (*Partition, error) {
 	var err error
 	pos := strings.Index(partition, "#")
 	if pos != -1 {
-		broker := partition[:pos]
+		broker := strings.TrimSpace(partition[:pos])
 		b, err = NewNode(true, broker)
 		if err != nil {
 			return nil, err
 		}
-		p := partition[pos+1:]
+		p := strings.TrimSpace(partition[pos+1:])
 		pos = strings.Index(p, ":")
 		if pos != -1 {
-			topic = p[0:pos]
-			partitionID, err = strconv.Atoi(p[pos+1:])
+			topic = strings.TrimSpace(p[0:pos])
+			partitionID, err = strconv.Atoi(strings.TrimSpace(p[pos+1:]))
 			if err != nil {
 				return nil, err
 			}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index 90ae3ef..932659a 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -58,15 +58,15 @@ func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
 	var err error
 	pos := strings.Index(subscribeInfo, "#")
 	if pos != -1 {
-		consumerInfo := subscribeInfo[:pos]
-		partitionInfo := subscribeInfo[pos+1:]
+		consumerInfo := strings.TrimSpace(subscribeInfo[:pos])
+		partitionInfo := strings.TrimSpace(subscribeInfo[pos+1:])
 		partition, err = NewPartition(partitionInfo)
 		if err != nil {
 			return nil, err
 		}
 		pos = strings.Index(consumerInfo, "@")
-		consumerID = consumerInfo[:pos]
-		group = consumerInfo[pos+1:]
+		consumerID = strings.TrimSpace(consumerInfo[:pos])
+		group = strings.TrimSpace(consumerInfo[pos+1:])
 	}
 	return &SubscribeInfo{
 		group:      group,
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 1d82c89..355e09b 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -44,7 +44,7 @@ type RmtDataCache struct {
 	qryPriorityID      int32
 	partitions         map[string]*metadata.Partition
 	usedPartitions     map[string]int64
-	indexPartitions    map[string]bool
+	indexPartitions    []string
 	partitionTimeouts  map[string]*time.Timer
 	topicPartitions    map[string]map[string]bool
 	partitionRegBooked map[string]bool
@@ -61,7 +61,7 @@ func NewRmtDataCache() *RmtDataCache {
 		brokerPartitions:   make(map[*metadata.Node]map[string]bool),
 		partitions:         make(map[string]*metadata.Partition),
 		usedPartitions:     make(map[string]int64),
-		indexPartitions:    make(map[string]bool),
+		indexPartitions:    make([]string, 0, 0),
 		partitionTimeouts:  make(map[string]*time.Timer),
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
@@ -240,10 +240,10 @@ func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
 		timer.Stop()
 		delete(r.partitionTimeouts, partitionKey)
 	}
-	delete(r.indexPartitions, partitionKey)
+	r.removeFromIndexPartitions(partitionKey)
 	if reuse {
 		if _, ok := r.partitions[partitionKey]; ok {
-			r.indexPartitions[partitionKey] = true
+			r.indexPartitions = append(r.indexPartitions, partitionKey)
 		}
 	}
 }
@@ -342,3 +342,14 @@ func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
 	}
 	return r.partitionRegBooked[partitionKey]
 }
+
+func (r *RmtDataCache) removeFromIndexPartitions(partitionKey string) {
+	pos := 0
+	for i, p := range r.indexPartitions {
+		if p == partitionKey {
+			pos = i
+			break
+		}
+	}
+	r.indexPartitions = append(r.indexPartitions[:pos], r.indexPartitions[pos+1:]...)
+}

[incubator-inlong] 02/09: Add license and comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 845de0bb31725c0cbba509eefdf31458331edd88
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jun 3 17:48:44 2021 +0800

    Add license and comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/util/util.go | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go
index 77422ac..1157d39 100644
--- a/tubemq-client-twins/tubemq-client-go/util/util.go
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -1,11 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package util defines the constants and helper functions.
 package util
 
 import (
 	"net"
 )
 
+// InValidValue of TubeMQ config.
 var InvalidValue = int64(-2)
 
+// GetLocalHost returns the local host name.
 func GetLocalHost() string {
 	ifaces, err := net.Interfaces()
 	if err != nil {
@@ -43,10 +63,12 @@ func GetLocalHost() string {
 	return ""
 }
 
+// GenBrokerAuthenticateToken generates the broker authenticate token.
 func GenBrokerAuthenticateToken(username string, password string) string {
 	return ""
 }
 
+// GenMasterAuthenticateToken generates the master authenticate token.
 func GenMasterAuthenticateToken(username string, password string) string {
 	return ""
 }

[incubator-inlong] 03/09: Implement offerEventResult

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a2af6f05690e750b899cb5e5257010459d693959
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Jun 4 10:20:54 2021 +0800

    Implement offerEventResult
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer_impl.go       |  7 ++--
 .../tubemq-client-go/remote/remote.go              | 39 ++++++++++++++--------
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index c0f7e9a..b7312bb 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -203,9 +203,6 @@ func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
 func (c *consumer) processRebalanceEvent() {
 	for {
 		event := c.rmtDataCache.TakeEvent()
-		if event == nil {
-			continue
-		}
 		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
 			break
 		}
@@ -213,10 +210,10 @@ func (c *consumer) processRebalanceEvent() {
 		switch event.GetEventType() {
 		case 2, 20:
 			c.disconnect2Broker(event)
-			c.rmtDataCache.OfferEvent(event)
+			c.rmtDataCache.OfferEventResult(event)
 		case 1, 10:
 			c.connect2Broker(event)
-			c.rmtDataCache.OfferEvent(event)
+			c.rmtDataCache.OfferEventResult(event)
 		}
 	}
 }
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index aa9a2d1..1d82c89 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -35,9 +35,11 @@ type RmtDataCache struct {
 	groupFlowCtrlID    int64
 	partitionSubInfo   map[string]*metadata.SubscribeInfo
 	rebalanceResults   []*metadata.ConsumerEvent
-	eventMu            sync.Mutex
+	eventWriteMu       sync.Mutex
+	eventReadMu        sync.Mutex
 	metaMu             sync.Mutex
 	dataBookMu         sync.Mutex
+	eventReadCond      *sync.Cond
 	brokerPartitions   map[*metadata.Node]map[string]bool
 	qryPriorityID      int32
 	partitions         map[string]*metadata.Partition
@@ -50,7 +52,7 @@ type RmtDataCache struct {
 
 // NewRmtDataCache returns a default rmtDataCache.
 func NewRmtDataCache() *RmtDataCache {
-	return &RmtDataCache{
+	r := &RmtDataCache{
 		defFlowCtrlID:      util.InvalidValue,
 		groupFlowCtrlID:    util.InvalidValue,
 		qryPriorityID:      int32(util.InvalidValue),
@@ -64,6 +66,8 @@ func NewRmtDataCache() *RmtDataCache {
 		topicPartitions:    make(map[string]map[string]bool),
 		partitionRegBooked: make(map[string]bool),
 	}
+	r.eventReadCond = sync.NewCond(&r.eventReadMu)
+	return r
 }
 
 // GetUnderGroupCtrl returns the underGroupCtrl.
@@ -104,8 +108,8 @@ func (r *RmtDataCache) GetQryPriorityID() int32 {
 
 // PollEventResult polls the first event result from the rebalanceResults.
 func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
+	r.eventWriteMu.Lock()
+	defer r.eventWriteMu.Unlock()
 	if len(r.rebalanceResults) > 0 {
 		event := r.rebalanceResults[0]
 		r.rebalanceResults = r.rebalanceResults[1:]
@@ -145,19 +149,20 @@ func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID i
 
 }
 
-// OfferEvent offers an consumer event.
+// OfferEvent offers an consumer event and notifies the consumer method.
 func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
+	r.eventReadMu.Lock()
+	defer r.eventReadMu.Unlock()
 	r.rebalanceResults = append(r.rebalanceResults, event)
+	r.eventReadCond.Broadcast()
 }
 
 // TakeEvent takes an event from the rebalanceResults.
 func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
-	if len(r.rebalanceResults) == 0 {
-		return nil
+	r.eventReadMu.Lock()
+	defer r.eventReadMu.Unlock()
+	for len(r.rebalanceResults) == 0 {
+		r.eventReadCond.Wait()
 	}
 	event := r.rebalanceResults[0]
 	r.rebalanceResults = r.rebalanceResults[1:]
@@ -166,11 +171,19 @@ func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
 
 // ClearEvent clears all the events.
 func (r *RmtDataCache) ClearEvent() {
-	r.eventMu.Lock()
-	defer r.eventMu.Unlock()
+	r.eventWriteMu.Lock()
+	defer r.eventWriteMu.Unlock()
 	r.rebalanceResults = r.rebalanceResults[:0]
 }
 
+// OfferEventResult offers an consumer event.
+func (r *RmtDataCache) OfferEventResult(event *metadata.ConsumerEvent) {
+	r.eventWriteMu.Lock()
+	defer r.eventWriteMu.Unlock()
+
+	r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
 // RemoveAndGetPartition removes the given partitions.
 func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) {
 	if len(subscribeInfos) == 0 {

[incubator-inlong] 07/09: Add license

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 67400c701b886696e0faf10872055eef3d4b6569
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed Jun 9 10:13:43 2021 +0800

    Add license
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/client/version.go | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/client/version.go b/tubemq-client-twins/tubemq-client-go/client/version.go
index 1828fd7..d374c87 100644
--- a/tubemq-client-twins/tubemq-client-go/client/version.go
+++ b/tubemq-client-twins/tubemq-client-go/client/version.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package client
 
 const (

[incubator-inlong] 08/09: init the channel

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 07e73aebc27eddd7a05fcb12d56aa44bb56f7122
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed Jun 9 10:28:28 2021 +0800

    init the channel
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 674f810..30b93b4 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -92,6 +92,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 		client:          client,
 		visitToken:      util.InvalidValue,
 		unreportedTimes: 0,
+		done:            make(chan struct{}),
 	}
 	c.subInfo.SetClientID(clientID)
 	hbm := newHBManager(c)

[incubator-inlong] 09/09: [INLONG-624]Go SDK Consumer Start API

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 65cfc8fdd9ba9aa5856eb3cba63ed2bd801f5ad2
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jun 10 10:55:19 2021 +0800

    [INLONG-624]Go SDK Consumer Start API
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/consumer.go            |  2 -
 .../tubemq-client-go/client/consumer_impl.go       | 42 ++++++-------
 .../tubemq-client-go/client/heartbeat.go           | 70 +++++++++++-----------
 3 files changed, 55 insertions(+), 59 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
index a74dfa5..27a63c6 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -31,8 +31,6 @@ var clientID uint64
 
 // Consumer is an interface that abstracts behavior of TubeMQ's consumer
 type Consumer interface {
-	// Start starts the consumer.
-	Start() error
 	// GetMessage receive a single message.
 	GetMessage() (*ConsumerResult, error)
 	// Confirm the consumption of a message.
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 30b93b4..f4211c5 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -97,18 +97,13 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 	c.subInfo.SetClientID(clientID)
 	hbm := newHBManager(c)
 	c.heartbeatManager = hbm
-	return c, nil
-}
-
-// Start implementation of tubeMQ consumer.
-func (c *consumer) Start() error {
-	err := c.register2Master(false)
+	err = c.register2Master(false)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	c.heartbeatManager.registerMaster(c.master.Address)
 	go c.processRebalanceEvent()
-	return nil
+	return c, nil
 }
 
 func (c *consumer) register2Master(needChange bool) error {
@@ -124,18 +119,20 @@ func (c *consumer) register2Master(needChange bool) error {
 		if err != nil {
 			return err
 		}
-		if rsp.GetSuccess() {
-			c.masterHBRetry = 0
-			c.processRegisterResponseM2C(rsp)
-			return nil
-		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
-			return nil
-		} else {
-			c.master, err = c.selector.Select(c.config.Consumer.Masters)
-			if err != nil {
+		if !rsp.GetSuccess() {
+			if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+				return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
+			}
+
+			if c.master, err = c.selector.Select(c.config.Consumer.Masters); err != nil {
 				return err
 			}
+			continue
 		}
+
+		c.masterHBRetry = 0
+		c.processRegisterResponseM2C(rsp)
+		return nil
 	}
 	return nil
 }
@@ -276,7 +273,6 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 		unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
 		if len(unsubPartitions) > 0 {
 			for _, partition := range unsubPartitions {
-
 				node := &metadata.Node{}
 				node.SetHost(util.GetLocalHost())
 				node.SetAddress(partition.GetBroker().GetAddress())
@@ -284,11 +280,15 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 				rsp, err := c.sendRegisterReq2Broker(partition, node)
 				if err != nil {
 					//todo add log
+					return
 				}
-				if rsp.GetSuccess() {
-					c.rmtDataCache.AddNewPartition(partition)
-					c.heartbeatManager.registerBroker(node)
+				if !rsp.GetSuccess() {
+					//todo add log
+					return
 				}
+
+				c.rmtDataCache.AddNewPartition(partition)
+				c.heartbeatManager.registerBroker(node)
 			}
 		}
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 5ec0b5c..fdd0d20 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -99,24 +99,27 @@ func (h *heartbeatManager) consumerHB2Master() {
 		if err != nil {
 			continue
 		}
-		if rsp.GetSuccess() {
-			h.processHBResponseM2C(rsp)
-			break
-		} else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+
+		if !rsp.GetSuccess() {
 			h.consumer.masterHBRetry++
-			address := h.consumer.master.Address
-			go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
-			if rsp.GetErrCode() != errs.RetErrHBNoNode {
-				hm := h.heartbeats[address]
-				hm.numConnections--
-				if hm.numConnections == 0 {
-					h.mu.Lock()
-					delete(h.heartbeats, address)
-					h.mu.Unlock()
+			if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+				address := h.consumer.master.Address
+				go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+				if rsp.GetErrCode() != errs.RetErrHBNoNode {
+					hm := h.heartbeats[address]
+					hm.numConnections--
+					if hm.numConnections == 0 {
+						h.mu.Lock()
+						delete(h.heartbeats, address)
+						h.mu.Unlock()
+					}
 				}
+				return
 			}
-			return
 		}
+		h.consumer.masterHBRetry = 0
+		h.processHBResponseM2C(rsp)
+		break
 	}
 	h.mu.Lock()
 	defer h.mu.Unlock()
@@ -187,31 +190,26 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 	if err != nil {
 		return
 	}
-
-	if rsp.GetSuccess() {
-		if rsp.GetHasPartFailure() {
-			partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
-			for _, fi := range rsp.GetFailureInfo() {
-				pos := strings.Index(fi, ":")
-				if pos == -1 {
-					continue
-				}
-				partition, err := metadata.NewPartition(fi[pos+1:])
-				if err != nil {
-					continue
-				}
-				partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+	partitionKeys := make([]string, 0, len(partitions))
+	if rsp.GetErrCode() == errs.RetCertificateFailure {
+		for _, partition := range partitions {
+			partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+		}
+		h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+	}
+	if rsp.GetSuccess() && rsp.GetHasPartFailure() {
+		for _, fi := range rsp.GetFailureInfo() {
+			pos := strings.Index(fi, ":")
+			if pos == -1 {
+				continue
 			}
-			h.consumer.rmtDataCache.RemovePartition(partitionKeys)
-		} else {
-			if rsp.GetErrCode() == errs.RetCertificateFailure {
-				partitionKeys := make([]string, 0, len(partitions))
-				for _, partition := range partitions {
-					partitionKeys = append(partitionKeys, partition.GetPartitionKey())
-				}
-				h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+			partition, err := metadata.NewPartition(fi[pos+1:])
+			if err != nil {
+				continue
 			}
+			partitionKeys = append(partitionKeys, partition.GetPartitionKey())
 		}
+		h.consumer.rmtDataCache.RemovePartition(partitionKeys)
 	}
 	h.resetBrokerTimer(broker)
 }