You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/06/15 01:59:54 UTC

[incubator-inlong] 01/09: [INLONG-624]Go SDK consumer interface

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 0d4ed41f647eb5155c936f10f0f61113c44161b3
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu Jun 3 16:02:09 2021 +0800

    [INLONG-624]Go SDK consumer interface
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../{metadata/node.go => client/consumer.go}       |  51 ++-
 .../tubemq-client-go/client/consumer_impl.go       | 359 +++++++++++++++++++++
 .../tubemq-client-go/client/heartbeat.go           | 229 +++++++++++++
 .../tubemq-client-go/client/remote.go              |  79 -----
 .../tubemq-client-go/config/config.go              |  18 +-
 .../tubemq-client-go/config/config_test.go         |  10 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  12 +-
 .../tubemq-client-go/metadata/consumer_event.go    |   8 +
 .../tubemq-client-go/metadata/metadata.go          |  20 ++
 .../tubemq-client-go/metadata/node.go              |  54 ++++
 .../tubemq-client-go/metadata/partition.go         |  42 ++-
 .../tubemq-client-go/metadata/subcribe_info.go     |  54 +++-
 .../tubemq-client-go/remote/remote.go              | 331 +++++++++++++++++++
 tubemq-client-twins/tubemq-client-go/rpc/broker.go |  32 +-
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  23 +-
 tubemq-client-twins/tubemq-client-go/rpc/master.go |  19 +-
 .../tubemq-client-go/{client => sub}/info.go       |  77 ++++-
 .../tubemq-client-go/transport/client.go           |   8 +-
 tubemq-client-twins/tubemq-client-go/util/util.go  |  52 +++
 19 files changed, 1299 insertions(+), 179 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/client/consumer.go
similarity index 52%
copy from tubemq-client-twins/tubemq-client-go/metadata/node.go
copy to tubemq-client-twins/tubemq-client-go/client/consumer.go
index 625f278..27a8536 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer.go
@@ -15,41 +15,32 @@
  * limitations under the License.
  */
 
-package metadata
+// Package client defines the api and information
+// which can be exposed to user.
+package client
 
-import (
-	"strconv"
+const (
+	tubeMQClientVersion = "0.1.0"
 )
 
-// Node represents the metadata of a node.
-type Node struct {
-	id      uint32
-	host    string
-	port    uint32
-	address string
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
 }
 
-// GetID returns the id of a node.
-func (n *Node) GetID() uint32 {
-	return n.id
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
 }
 
-// GetPort returns the port of a node.
-func (n *Node) GetPort() uint32 {
-	return n.port
-}
-
-// GetHost returns the hostname of a node.
-func (n *Node) GetHost() string {
-	return n.host
-}
-
-// GetAddress returns the address of a node.
-func (n *Node) GetAddress() string {
-	return n.address
-}
-
-// String returns the metadata of a node as a string.
-func (n *Node) String() string {
-	return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port))
+var clientIndex uint64
+
+// Consumer is an interface that abstracts behavior of TubeMQ's consumer
+type Consumer interface {
+	// Start starts the consumer.
+	Start() error
+	// GetMessage receive a single message.
+	GetMessage() (*ConsumerResult, error)
+	// Confirm the consumption of a message.
+	Confirm(confirmContext string, consumed bool) (*ConsumerResult, error)
+	// GetCurrConsumedInfo returns the consumptions of the consumer.
+	GetCurrConsumedInfo() (map[string]*ConsumerOffset, error)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
new file mode 100644
index 0000000..c0f7e9a
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {
+		event := c.rmtDataCache.TakeEvent()
+		if event == nil {
+			continue
+		}
+		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+			break
+		}
+		c.rmtDataCache.ClearEvent()
+		switch event.GetEventType() {
+		case 2, 20:
+			c.disconnect2Broker(event)
+			c.rmtDataCache.OfferEvent(event)
+		case 1, 10:
+			c.connect2Broker(event)
+			c.rmtDataCache.OfferEvent(event)
+		}
+	}
+}
+
+func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
+	subscribeInfo := event.GetSubscribeInfo()
+	if len(subscribeInfo) > 0 {
+		removedPartitions := make(map[*metadata.Node][]*metadata.Partition)
+		c.rmtDataCache.RemoveAndGetPartition(subscribeInfo, c.config.Consumer.RollbackIfConfirmTimeout, removedPartitions)
+		if len(removedPartitions) > 0 {
+			c.unregister2Broker(removedPartitions)
+		}
+	}
+	event.SetEventStatus(2)
+}
+
+func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) {
+	if len(unRegPartitions) == 0 {
+		return
+	}
+
+	for _, partitions := range unRegPartitions {
+		for _, partition := range partitions {
+			ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+			m := &metadata.Metadata{}
+			node := &metadata.Node{}
+			node.SetHost(util.GetLocalHost())
+			node.SetAddress(partition.GetBroker().GetAddress())
+			m.SetNode(node)
+			m.SetReadStatus(1)
+			sub := &metadata.SubscribeInfo{}
+			sub.SetGroup(c.config.Consumer.Group)
+			sub.SetConsumerID(c.clientID)
+			sub.SetPartition(partition)
+			m.SetSubscribeInfo(sub)
+
+			c.client.UnregisterRequestC2B(ctx, m, c.subInfo)
+			cancel()
+		}
+	}
+}
+
+func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+	if len(event.GetSubscribeInfo()) > 0 {
+		unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
+		if len(unsubPartitions) > 0 {
+			for _, partition := range unsubPartitions {
+				m := &metadata.Metadata{}
+				node := &metadata.Node{}
+				node.SetHost(util.GetLocalHost())
+				node.SetAddress(partition.GetBroker().GetAddress())
+				m.SetNode(node)
+				sub := &metadata.SubscribeInfo{}
+				sub.SetGroup(c.config.Consumer.Group)
+				sub.SetConsumerID(c.clientID)
+				sub.SetPartition(partition)
+				m.SetSubscribeInfo(sub)
+				isFirstRegister := c.rmtDataCache.IsFirstRegister(partition.GetPartitionKey())
+				m.SetReadStatus(c.getConsumeReadStatus(isFirstRegister))
+				auth := c.genBrokerAuthenticInfo(true)
+				c.subInfo.SetAuthorizedInfo(auth)
+
+				ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+				rsp, err := c.client.RegisterRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
+				if err != nil {
+					//todo add log
+				}
+				if rsp.GetSuccess() {
+					c.rmtDataCache.AddNewPartition(partition)
+					c.heartbeatManager.registerBroker(node)
+				}
+				cancel()
+			}
+		}
+	}
+	c.subInfo.FirstRegistered()
+	event.SetEventStatus(2)
+}
+
+func newClientIndex() uint64 {
+	return atomic.AddUint64(&clientIndex, 1)
+}
+
+func newClientID(group string) string {
+	return group + "_" +
+		util.GetLocalHost() + "_" +
+		strconv.Itoa(os.Getpid()) + "_" +
+		strconv.Itoa(int(time.Now().Unix()*1000)) + "_" +
+		strconv.Itoa(int(newClientIndex())) + "_" +
+		tubeMQClientVersion
+}
+
+func (c *consumer) genBrokerAuthenticInfo(force bool) *protocol.AuthorizedInfo {
+	needAdd := false
+	auth := &protocol.AuthorizedInfo{}
+	if c.config.Net.Auth.Enable {
+		if force {
+			needAdd = true
+			atomic.StoreInt32(&c.nextAuth2Broker, 0)
+		} else if atomic.LoadInt32(&c.nextAuth2Broker) == 1 {
+			if atomic.CompareAndSwapInt32(&c.nextAuth2Broker, 1, 0) {
+				needAdd = true
+			}
+		}
+		if needAdd {
+			authToken := util.GenBrokerAuthenticateToken(c.config.Net.Auth.UserName, c.config.Net.Auth.Password)
+			auth.AuthAuthorizedToken = proto.String(authToken)
+		}
+	}
+	return auth
+}
+
+func (c *consumer) genMasterAuthenticateToken(auth *protocol.AuthenticateInfo, force bool) {
+	needAdd := false
+	if c.config.Net.Auth.Enable {
+		if force {
+			needAdd = true
+			atomic.StoreInt32(&c.nextAuth2Master, 0)
+		} else if atomic.LoadInt32(&c.nextAuth2Master) == 1 {
+			if atomic.CompareAndSwapInt32(&c.nextAuth2Master, 1, 0) {
+				needAdd = true
+			}
+		}
+		if needAdd {
+		}
+	}
+}
+
+func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
+	readStatus := consumeStatusNormal
+	if isFirstReg {
+		if c.config.Consumer.ConsumePosition == 0 {
+			readStatus = consumeStatusFromMax
+		} else if c.config.Consumer.ConsumePosition > 0 {
+			readStatus = consumeStatusFromMaxAlways
+		}
+	}
+	return int32(readStatus)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
new file mode 100644
index 0000000..9c29854
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+type heartbeatMetadata struct {
+	numConnections int
+	timer          *time.Timer
+}
+
+type heartbeatManager struct {
+	consumer   *consumer
+	heartbeats map[string]*heartbeatMetadata
+	mu         sync.Mutex
+}
+
+func newHBManager(consumer *consumer) *heartbeatManager {
+	return &heartbeatManager{
+		consumer:   consumer,
+		heartbeats: make(map[string]*heartbeatMetadata),
+	}
+}
+
+func (h *heartbeatManager) registerMaster(address string) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	if _, ok := h.heartbeats[address]; !ok {
+		h.heartbeats[address] = &heartbeatMetadata{
+			numConnections: 1,
+			timer:          time.AfterFunc(h.consumer.config.Heartbeat.Interval/2, h.consumerHB2Master),
+		}
+	}
+	hm := h.heartbeats[address]
+	hm.numConnections++
+}
+
+func (h *heartbeatManager) registerBroker(broker *metadata.Node) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+
+	if _, ok := h.heartbeats[broker.GetAddress()]; !ok {
+		h.heartbeats[broker.GetAddress()] = &heartbeatMetadata{
+			numConnections: 1,
+			timer:          time.AfterFunc(h.consumer.config.Heartbeat.Interval, func() { h.consumerHB2Broker(broker) }),
+		}
+	}
+	hm := h.heartbeats[broker.GetAddress()]
+	hm.numConnections++
+}
+
+func (h *heartbeatManager) consumerHB2Master() {
+	if time.Now().UnixNano()/int64(time.Millisecond)-h.consumer.lastMasterHb > 30000 {
+		h.consumer.rmtDataCache.HandleExpiredPartitions(h.consumer.config.Consumer.MaxConfirmWait)
+	}
+	m := &metadata.Metadata{}
+	node := &metadata.Node{}
+	node.SetHost(util.GetLocalHost())
+	node.SetAddress(h.consumer.master.Address)
+	m.SetNode(node)
+	sub := &metadata.SubscribeInfo{}
+	sub.SetGroup(h.consumer.config.Consumer.Group)
+	m.SetSubscribeInfo(sub)
+	h.consumer.unreportedTimes++
+	if h.consumer.unreportedTimes > h.consumer.config.Consumer.MaxSubInfoReportInterval {
+		m.SetReportTimes(true)
+		h.consumer.unreportedTimes = 0
+	}
+
+	retry := 0
+	for retry < h.consumer.config.Heartbeat.MaxRetryTimes {
+		ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+		rsp, err := h.consumer.client.HeartRequestC2M(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+		if err != nil {
+			cancel()
+		}
+		if rsp.GetSuccess() {
+			cancel()
+			h.processHBResponseM2C(rsp)
+			break
+		} else if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+			cancel()
+			h.consumer.masterHBRetry++
+			address := h.consumer.master.Address
+			go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
+			if rsp.GetErrCode() != errs.RetErrHBNoNode {
+				hm := h.heartbeats[address]
+				hm.numConnections--
+				if hm.numConnections == 0 {
+					h.mu.Lock()
+					delete(h.heartbeats, address)
+					h.mu.Unlock()
+				}
+			}
+			return
+		}
+		cancel()
+	}
+	h.mu.Lock()
+	defer h.mu.Unlock()
+	hm := h.heartbeats[h.consumer.master.Address]
+	hm.timer.Reset(h.nextHeartbeatInterval())
+}
+
+func (h *heartbeatManager) processHBResponseM2C(rsp *protocol.HeartResponseM2C) {
+	h.consumer.masterHBRetry = 0
+	h.consumer.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetGroupFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			h.consumer.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := h.consumer.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		h.consumer.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		h.consumer.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	if rsp.GetRequireAuth() {
+		atomic.StoreInt32(&h.consumer.nextAuth2Master, 1)
+	}
+	if rsp.GetEvent() != nil {
+		event := rsp.GetEvent()
+		subscribeInfo := make([]*metadata.SubscribeInfo, 0, len(event.GetSubscribeInfo()))
+		for _, sub := range event.GetSubscribeInfo() {
+			s, err := metadata.NewSubscribeInfo(sub)
+			if err != nil {
+				continue
+			}
+			subscribeInfo = append(subscribeInfo, s)
+		}
+		e := metadata.NewEvent(event.GetRebalanceId(), event.GetOpType(), subscribeInfo)
+		h.consumer.rmtDataCache.OfferEvent(e)
+	}
+}
+
+func (h *heartbeatManager) nextHeartbeatInterval() time.Duration {
+	interval := h.consumer.config.Heartbeat.Interval
+	if h.consumer.masterHBRetry >= h.consumer.config.Heartbeat.MaxRetryTimes {
+		interval = h.consumer.config.Heartbeat.AfterFail
+	}
+	return interval
+}
+
+func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
+	h.mu.Lock()
+	defer h.mu.Unlock()
+
+	partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+	if len(partitions) == 0 {
+		h.resetBrokerTimer(broker)
+		return
+	}
+	m := &metadata.Metadata{}
+	m.SetReadStatus(h.consumer.getConsumeReadStatus(false))
+	m.SetNode(broker)
+	ctx, cancel := context.WithTimeout(context.Background(), h.consumer.config.Net.ReadTimeout)
+	defer cancel()
+
+	rsp, err := h.consumer.client.HeartbeatRequestC2B(ctx, m, h.consumer.subInfo, h.consumer.rmtDataCache)
+	if err != nil {
+		return
+	}
+	if rsp.GetSuccess() {
+		if rsp.GetHasPartFailure() {
+			partitionKeys := make([]string, 0, len(rsp.GetFailureInfo()))
+			for _, fi := range rsp.GetFailureInfo() {
+				pos := strings.Index(fi, ":")
+				if pos == -1 {
+					continue
+				}
+				partition, err := metadata.NewPartition(fi[pos+1:])
+				if err != nil {
+					continue
+				}
+				partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+			}
+			h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+		} else {
+			if rsp.GetErrCode() == errs.RetCertificateFailure {
+				partitionKeys := make([]string, 0, len(partitions))
+				for _, partition := range partitions {
+					partitionKeys = append(partitionKeys, partition.GetPartitionKey())
+				}
+				h.consumer.rmtDataCache.RemovePartition(partitionKeys)
+			}
+		}
+	}
+	h.resetBrokerTimer(broker)
+}
+
+func (h *heartbeatManager) resetBrokerTimer(broker *metadata.Node) {
+	interval := h.consumer.config.Heartbeat.Interval
+	partitions := h.consumer.rmtDataCache.GetPartitionByBroker(broker)
+	if len(partitions) == 0 {
+		delete(h.heartbeats, broker.GetAddress())
+	} else {
+		hm := h.heartbeats[broker.GetAddress()]
+		hm.timer.Reset(interval)
+	}
+}
diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go
deleted file mode 100644
index e36b69f..0000000
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// Package client defines the api and information
-// which can be exposed to user.
-package client
-
-import (
-	"sync"
-
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
-)
-
-// RmtDataCache represents the data returned from TubeMQ.
-type RmtDataCache struct {
-	consumerID         string
-	groupName          string
-	underGroupCtrl     bool
-	defFlowCtrlID      int64
-	groupFlowCtrlID    int64
-	subscribeInfo      []*metadata.SubscribeInfo
-	rebalanceResults   []*metadata.ConsumerEvent
-	mu                 sync.Mutex
-	brokerToPartitions map[*metadata.Node][]*metadata.Partition
-}
-
-// GetUnderGroupCtrl returns the underGroupCtrl.
-func (r *RmtDataCache) GetUnderGroupCtrl() bool {
-	return r.underGroupCtrl
-}
-
-// GetDefFlowCtrlID returns the defFlowCtrlID.
-func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
-	return r.defFlowCtrlID
-}
-
-// GetGroupFlowCtrlID returns the groupFlowCtrlID.
-func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
-	return r.groupFlowCtrlID
-}
-
-// GetSubscribeInfo returns the subscribeInfo.
-func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
-	return r.subscribeInfo
-}
-
-// PollEventResult polls the first event result from the rebalanceResults.
-func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
-	r.mu.Lock()
-	defer r.mu.Unlock()
-	if len(r.rebalanceResults) > 0 {
-		event := r.rebalanceResults[0]
-		r.rebalanceResults = r.rebalanceResults[1:]
-		return event
-	}
-	return nil
-}
-
-// GetPartitionByBroker returns the
-func (r *RmtDataCache) GetPartitionByBroker(node *metadata.Node) []*metadata.Partition {
-	if partitions, ok := r.brokerToPartitions[node]; ok {
-		return partitions
-	}
-	return nil
-}
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index 47a360a..7a93831 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -29,7 +29,7 @@ import (
 // Config defines multiple configuration options.
 // Refer to: https://github.com/apache/incubator-inlong/blob/3249de37acf054a9c43677131cfbb09fc6d366d1/tubemq-client/src/main/java/org/apache/tubemq/client/config/ConsumerConfig.java
 type Config struct {
-	// Net is the namespace for network-level properties used by Broker and Master.
+	// Net is the namespace for network-level properties used by broker and Master.
 	Net struct {
 		// ReadTimeout represents how long to wait for a response.
 		ReadTimeout time.Duration
@@ -61,9 +61,13 @@ type Config struct {
 	// used by the consumer
 	Consumer struct {
 		// Masters is the addresses of master.
-		Masters []string
-		// Topic of the consumption.
-		Topic string
+		Masters string
+		// Topics of the consumption.
+		Topics []string
+		// TopicFilters is the map of topic to filters.
+		TopicFilters map[string][]string
+		// PartitionOffset is the map of partition to its corresponding offset.
+		PartitionOffset map[string]int64
 		// ConsumerPosition is the initial offset to use if no offset was previously committed.
 		ConsumePosition int
 		// Group is the consumer group name.
@@ -152,7 +156,7 @@ func ParseAddress(address string) (config *Config, err error) {
 		return nil, fmt.Errorf("address format invalid: address: %v, token: %v", address, tokens)
 	}
 
-	c.Consumer.Masters = strings.Split(tokens[0], ",")
+	c.Consumer.Masters = tokens[0]
 
 	tokens = strings.Split(tokens[1], "&")
 	if len(tokens) == 0 {
@@ -190,8 +194,8 @@ func getConfigFromToken(config *Config, values []string) error {
 		config.Net.TLS.TLSServerName = values[1]
 	case "group":
 		config.Consumer.Group = values[1]
-	case "topic":
-		config.Consumer.Topic = values[1]
+	case "topics":
+		config.Consumer.Topics = strings.Split(values[1], ",")
 	case "consumePosition":
 		config.Consumer.ConsumePosition, err = strconv.Atoi(values[1])
 	case "boundConsume":
diff --git a/tubemq-client-twins/tubemq-client-go/config/config_test.go b/tubemq-client-twins/tubemq-client-go/config/config_test.go
index 56bc600..a8ac703 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config_test.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config_test.go
@@ -25,11 +25,11 @@ import (
 )
 
 func TestParseAddress(t *testing.T) {
-	address := "127.0.0.1:9092,127.0.0.1:9093?topic=Topic&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
+	address := "127.0.0.1:9092,127.0.0.1:9093?topics=Topic1,Topic2&group=Group&tlsEnable=false&msgNotFoundWait=10000&heartbeatMaxRetryTimes=6"
 	c, err := ParseAddress(address)
 	assert.Nil(t, err)
-	assert.Equal(t, c.Consumer.Masters, []string{"127.0.0.1:9092", "127.0.0.1:9093"})
-	assert.Equal(t, c.Consumer.Topic, "Topic")
+	assert.Equal(t, c.Consumer.Masters, "127.0.0.1:9092,127.0.0.1:9093")
+	assert.Equal(t, c.Consumer.Topics, []string{"Topic1", "Topic2"})
 	assert.Equal(t, c.Consumer.Group, "Group")
 	assert.Equal(t, c.Consumer.MsgNotFoundWait, 10000*time.Millisecond)
 
@@ -41,11 +41,11 @@ func TestParseAddress(t *testing.T) {
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 
-	address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt"
+	address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt"
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 
-	address = "127.0.0.1:9092,127.0.0.1:9093?Topic=Topic&ttt=ttt"
+	address = "127.0.0.1:9092,127.0.0.1:9093?topics=Topic&ttt=ttt"
 	_, err = ParseAddress(address)
 	assert.NotNil(t, err)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 00b3c81..eb45179 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -33,12 +33,18 @@ const (
 	RetAssertionFailure = 4
 	// RetRequestFailure represents the error code of request error.
 	RetRequestFailure = 5
-	// RetSelectorNotExist = 6
-	RetSelectorNotExist = 6
+	// RetSelectorNotExist represents the selector not exists.
+	RetSelectorNotExist        = 6
+	RetErrHBNoNode             = 411
+	RetCertificateFailure      = 415
+	RetConsumeGroupForbidden   = 450
+	RetConsumeContentForbidden = 455
 )
 
 // ErrAssertionFailure represents RetAssertionFailure error.
-var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+var (
+	ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
+)
 
 // Error provides a TubeMQ-specific error container
 type Error struct {
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
index 2b4ce49..6ce2915 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/consumer_event.go
@@ -25,6 +25,14 @@ type ConsumerEvent struct {
 	subscribeInfo []*SubscribeInfo
 }
 
+func NewEvent(rebalanceID int64, eventType int32, subscribeInfo []*SubscribeInfo) *ConsumerEvent {
+	return &ConsumerEvent{
+		rebalanceID:   rebalanceID,
+		eventType:     eventType,
+		subscribeInfo: subscribeInfo,
+	}
+}
+
 // GetRebalanceID returns the rebalanceID of a consumer event.
 func (c *ConsumerEvent) GetRebalanceID() int64 {
 	return c.rebalanceID
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
index 861a06e..f64674a 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/metadata.go
@@ -45,3 +45,23 @@ func (m *Metadata) GetReadStatus() int32 {
 func (m *Metadata) GetReportTimes() bool {
 	return m.reportTimes
 }
+
+// SetNode sets the node.
+func (m *Metadata) SetNode(node *Node) {
+	m.node = node
+}
+
+// SetSubscribeInfo sets the subscribeInfo.
+func (m *Metadata) SetSubscribeInfo(sub *SubscribeInfo) {
+	m.subscribeInfo = sub
+}
+
+// ReadStatus sets the status.
+func (m *Metadata) SetReadStatus(status int32) {
+	m.readStatus = status
+}
+
+// SetReportTimes sets the reportTimes.
+func (m *Metadata) SetReportTimes(reportTimes bool) {
+	m.reportTimes = reportTimes
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/node.go b/tubemq-client-twins/tubemq-client-go/metadata/node.go
index 625f278..bcfda82 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/node.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/node.go
@@ -19,6 +19,7 @@ package metadata
 
 import (
 	"strconv"
+	"strings"
 )
 
 // Node represents the metadata of a node.
@@ -29,6 +30,43 @@ type Node struct {
 	address string
 }
 
+// NewNode constructs a node from a given string.
+// If the given string is invalid, it will return error.
+func NewNode(isBroker bool, node string) (*Node, error) {
+	res := strings.Split(node, ":")
+	nodeID := 0
+	host := ""
+	port := 8123
+	var err error
+	if isBroker {
+		nodeID, err = strconv.Atoi(res[0])
+		if err != nil {
+			return nil, err
+		}
+		host = res[1]
+		if len(res) >= 3 {
+			port, err = strconv.Atoi(res[2])
+			if err != nil {
+				return nil, err
+			}
+		}
+	} else {
+		host = res[0]
+		if len(res) >= 2 {
+			port, err = strconv.Atoi(res[1])
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	return &Node{
+		id:      uint32(nodeID),
+		host:    host,
+		port:    uint32(port),
+		address: host + ":" + strconv.Itoa(port),
+	}, nil
+}
+
 // GetID returns the id of a node.
 func (n *Node) GetID() uint32 {
 	return n.id
@@ -53,3 +91,19 @@ func (n *Node) GetAddress() string {
 func (n *Node) String() string {
 	return strconv.Itoa(int(n.id)) + ":" + n.host + ":" + strconv.Itoa(int(n.port))
 }
+
+// SetHost sets the host.
+func (n *Node) SetHost(host string) {
+	n.host = host
+}
+
+// SetAddress sets the address.
+func (n *Node) SetAddress(address string) error {
+	port, err := strconv.Atoi(strings.Split(address, ":")[1])
+	if err != nil {
+		return err
+	}
+	n.address = address
+	n.port = uint32(port)
+	return nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/partition.go b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
index a180214..fbe1086 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/partition.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/partition.go
@@ -19,18 +19,48 @@ package metadata
 
 import (
 	"strconv"
+	"strings"
 )
 
 // Partition represents the metadata of a partition.
 type Partition struct {
 	topic        string
-	Broker       *Node
+	broker       *Node
 	partitionID  int32
 	partitionKey string
 	offset       int64
 	lastConsumed bool
 }
 
+func NewPartition(partition string) (*Partition, error) {
+	var b *Node
+	var topic string
+	var partitionID int
+	var err error
+	pos := strings.Index(partition, "#")
+	if pos != -1 {
+		broker := partition[:pos]
+		b, err = NewNode(true, broker)
+		if err != nil {
+			return nil, err
+		}
+		p := partition[pos+1:]
+		pos = strings.Index(p, ":")
+		if pos != -1 {
+			topic = p[0:pos]
+			partitionID, err = strconv.Atoi(p[pos+1:])
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	return &Partition{
+		topic:       topic,
+		broker:      b,
+		partitionID: int32(partitionID),
+	}, nil
+}
+
 // GetLastConsumed returns lastConsumed of a partition.
 func (p *Partition) GetLastConsumed() bool {
 	return p.lastConsumed
@@ -51,7 +81,15 @@ func (p *Partition) GetTopic() string {
 	return p.topic
 }
 
+func (p *Partition) GetBroker() *Node {
+	return p.broker
+}
+
 // String returns the metadata of a Partition as a string.
 func (p *Partition) String() string {
-	return p.Broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+	return p.broker.String() + "#" + p.topic + "@" + strconv.Itoa(int(p.partitionID))
+}
+
+func (p *Partition) SetLastConsumed(lastConsumed bool) {
+	p.lastConsumed = lastConsumed
 }
diff --git a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
index d76437a..90ae3ef 100644
--- a/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
+++ b/tubemq-client-twins/tubemq-client-go/metadata/subcribe_info.go
@@ -19,14 +19,14 @@ package metadata
 
 import (
 	"fmt"
+	"strings"
 )
 
 // SubscribeInfo represents the metadata of the subscribe info.
 type SubscribeInfo struct {
-	group         string
-	consumerID    string
-	partition     *Partition
-	qryPriorityID int32
+	group      string
+	consumerID string
+	partition  *Partition
 }
 
 // GetGroup returns the group name.
@@ -44,12 +44,48 @@ func (s *SubscribeInfo) GetPartition() *Partition {
 	return s.partition
 }
 
-// GetQryPriorityID returns the QryPriorityID.
-func (s *SubscribeInfo) GetQryPriorityID() int32 {
-	return s.qryPriorityID
-}
-
 // String returns the contents of SubscribeInfo as a string.
 func (s *SubscribeInfo) String() string {
 	return fmt.Sprintf("%s@%s-%s", s.consumerID, s.group, s.partition.String())
 }
+
+// NewSubscribeInfo constructs a SubscribeInfo from a given string.
+// If the given is invalid, it will return error.
+func NewSubscribeInfo(subscribeInfo string) (*SubscribeInfo, error) {
+	consumerID := ""
+	group := ""
+	var partition *Partition
+	var err error
+	pos := strings.Index(subscribeInfo, "#")
+	if pos != -1 {
+		consumerInfo := subscribeInfo[:pos]
+		partitionInfo := subscribeInfo[pos+1:]
+		partition, err = NewPartition(partitionInfo)
+		if err != nil {
+			return nil, err
+		}
+		pos = strings.Index(consumerInfo, "@")
+		consumerID = consumerInfo[:pos]
+		group = consumerInfo[pos+1:]
+	}
+	return &SubscribeInfo{
+		group:      group,
+		consumerID: consumerID,
+		partition:  partition,
+	}, nil
+}
+
+// SetPartition sets the partition.
+func (s *SubscribeInfo) SetPartition(partition *Partition) {
+	s.partition = partition
+}
+
+// SetGroup sets the group.
+func (s *SubscribeInfo) SetGroup(group string) {
+	s.group = group
+}
+
+// SetConsumerID sets the consumerID.
+func (s *SubscribeInfo) SetConsumerID(id string) {
+	s.consumerID = id
+}
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
new file mode 100644
index 0000000..aa9a2d1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package remote defines the remote data which is returned from TubeMQ.
+package remote
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+// RmtDataCache represents the data returned from TubeMQ.
+type RmtDataCache struct {
+	consumerID         string
+	groupName          string
+	underGroupCtrl     bool
+	defFlowCtrlID      int64
+	groupFlowCtrlID    int64
+	partitionSubInfo   map[string]*metadata.SubscribeInfo
+	rebalanceResults   []*metadata.ConsumerEvent
+	eventMu            sync.Mutex
+	metaMu             sync.Mutex
+	dataBookMu         sync.Mutex
+	brokerPartitions   map[*metadata.Node]map[string]bool
+	qryPriorityID      int32
+	partitions         map[string]*metadata.Partition
+	usedPartitions     map[string]int64
+	indexPartitions    map[string]bool
+	partitionTimeouts  map[string]*time.Timer
+	topicPartitions    map[string]map[string]bool
+	partitionRegBooked map[string]bool
+}
+
+// NewRmtDataCache returns a default rmtDataCache.
+func NewRmtDataCache() *RmtDataCache {
+	return &RmtDataCache{
+		defFlowCtrlID:      util.InvalidValue,
+		groupFlowCtrlID:    util.InvalidValue,
+		qryPriorityID:      int32(util.InvalidValue),
+		partitionSubInfo:   make(map[string]*metadata.SubscribeInfo),
+		rebalanceResults:   make([]*metadata.ConsumerEvent, 0, 0),
+		brokerPartitions:   make(map[*metadata.Node]map[string]bool),
+		partitions:         make(map[string]*metadata.Partition),
+		usedPartitions:     make(map[string]int64),
+		indexPartitions:    make(map[string]bool),
+		partitionTimeouts:  make(map[string]*time.Timer),
+		topicPartitions:    make(map[string]map[string]bool),
+		partitionRegBooked: make(map[string]bool),
+	}
+}
+
+// GetUnderGroupCtrl returns the underGroupCtrl.
+func (r *RmtDataCache) GetUnderGroupCtrl() bool {
+	return r.underGroupCtrl
+}
+
+// GetDefFlowCtrlID returns the defFlowCtrlID.
+func (r *RmtDataCache) GetDefFlowCtrlID() int64 {
+	return r.defFlowCtrlID
+}
+
+// GetGroupFlowCtrlID returns the groupFlowCtrlID.
+func (r *RmtDataCache) GetGroupFlowCtrlID() int64 {
+	return r.groupFlowCtrlID
+}
+
+// GetGroupName returns the group name.
+func (r *RmtDataCache) GetGroupName() string {
+	return r.groupName
+}
+
+// GetSubscribeInfo returns the partitionSubInfo.
+func (r *RmtDataCache) GetSubscribeInfo() []*metadata.SubscribeInfo {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	subInfos := make([]*metadata.SubscribeInfo, 0, len(r.partitionSubInfo))
+	for _, sub := range r.partitionSubInfo {
+		subInfos = append(subInfos, sub)
+	}
+	return subInfos
+}
+
+// GetQryPriorityID returns the QryPriorityID.
+func (r *RmtDataCache) GetQryPriorityID() int32 {
+	return r.qryPriorityID
+}
+
+// PollEventResult polls the first event result from the rebalanceResults.
+func (r *RmtDataCache) PollEventResult() *metadata.ConsumerEvent {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	if len(r.rebalanceResults) > 0 {
+		event := r.rebalanceResults[0]
+		r.rebalanceResults = r.rebalanceResults[1:]
+		return event
+	}
+	return nil
+}
+
+// GetPartitionByBroker returns the subscribed partitions of the given broker.
+func (r *RmtDataCache) GetPartitionByBroker(broker *metadata.Node) []*metadata.Partition {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+
+	if partitionMap, ok := r.brokerPartitions[broker]; ok {
+		partitions := make([]*metadata.Partition, 0, len(partitionMap))
+		for partition := range partitionMap {
+			partitions = append(partitions, r.partitions[partition])
+		}
+		return partitions
+	}
+	return nil
+}
+
+// SetConsumerInfo sets the consumer information including consumerID and groupName.
+func (r *RmtDataCache) SetConsumerInfo(consumerID string, group string) {
+	r.consumerID = consumerID
+	r.groupName = group
+}
+
+// UpdateDefFlowCtrlInfo updates the defFlowCtrlInfo.
+func (r *RmtDataCache) UpdateDefFlowCtrlInfo(flowCtrlID int64, flowCtrlInfo string) {
+
+}
+
+// UpdateGroupFlowCtrlInfo updates the groupFlowCtrlInfo.
+func (r *RmtDataCache) UpdateGroupFlowCtrlInfo(qryPriorityID int32, flowCtrlID int64, flowCtrlInfo string) {
+
+}
+
+// OfferEvent offers an consumer event.
+func (r *RmtDataCache) OfferEvent(event *metadata.ConsumerEvent) {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	r.rebalanceResults = append(r.rebalanceResults, event)
+}
+
+// TakeEvent takes an event from the rebalanceResults.
+func (r *RmtDataCache) TakeEvent() *metadata.ConsumerEvent {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	if len(r.rebalanceResults) == 0 {
+		return nil
+	}
+	event := r.rebalanceResults[0]
+	r.rebalanceResults = r.rebalanceResults[1:]
+	return event
+}
+
+// ClearEvent clears all the events.
+func (r *RmtDataCache) ClearEvent() {
+	r.eventMu.Lock()
+	defer r.eventMu.Unlock()
+	r.rebalanceResults = r.rebalanceResults[:0]
+}
+
+// RemoveAndGetPartition removes the given partitions.
+func (r *RmtDataCache) RemoveAndGetPartition(subscribeInfos []*metadata.SubscribeInfo, processingRollback bool, partitions map[*metadata.Node][]*metadata.Partition) {
+	if len(subscribeInfos) == 0 {
+		return
+	}
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	for _, sub := range subscribeInfos {
+		partitionKey := sub.GetPartition().GetPartitionKey()
+		if partition, ok := r.partitions[partitionKey]; ok {
+			if _, ok := r.usedPartitions[partitionKey]; ok {
+				if processingRollback {
+					partition.SetLastConsumed(false)
+				} else {
+					partition.SetLastConsumed(true)
+				}
+			}
+			if _, ok := partitions[partition.GetBroker()]; !ok {
+				partitions[partition.GetBroker()] = []*metadata.Partition{partition}
+			} else {
+				partitions[partition.GetBroker()] = append(partitions[partition.GetBroker()], partition)
+			}
+			r.removeMetaInfo(partitionKey)
+		}
+		r.resetIdlePartition(partitionKey, false)
+	}
+}
+
+func (r *RmtDataCache) removeMetaInfo(partitionKey string) {
+	if partition, ok := r.partitions[partitionKey]; ok {
+		if partitions, ok := r.topicPartitions[partition.GetTopic()]; ok {
+			delete(partitions, partitionKey)
+			if len(partitions) == 0 {
+				delete(r.topicPartitions, partition.GetTopic())
+			}
+		}
+		if partitions, ok := r.brokerPartitions[partition.GetBroker()]; ok {
+			delete(partitions, partition.GetPartitionKey())
+			if len(partitions) == 0 {
+				delete(r.brokerPartitions, partition.GetBroker())
+			}
+		}
+		delete(r.partitions, partitionKey)
+		delete(r.partitionSubInfo, partitionKey)
+	}
+}
+
+func (r *RmtDataCache) resetIdlePartition(partitionKey string, reuse bool) {
+	delete(r.usedPartitions, partitionKey)
+	if timer, ok := r.partitionTimeouts[partitionKey]; ok {
+		if !timer.Stop() {
+			<-timer.C
+		}
+		timer.Stop()
+		delete(r.partitionTimeouts, partitionKey)
+	}
+	delete(r.indexPartitions, partitionKey)
+	if reuse {
+		if _, ok := r.partitions[partitionKey]; ok {
+			r.indexPartitions[partitionKey] = true
+		}
+	}
+}
+
+// FilterPartitions returns the unsubscribed partitions.
+func (r *RmtDataCache) FilterPartitions(subInfos []*metadata.SubscribeInfo) []*metadata.Partition {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	unsubPartitions := make([]*metadata.Partition, 0, len(subInfos))
+	if len(r.partitions) == 0 {
+		for _, sub := range subInfos {
+			unsubPartitions = append(unsubPartitions, sub.GetPartition())
+		}
+	} else {
+		for _, sub := range subInfos {
+			if _, ok := r.partitions[sub.GetPartition().GetPartitionKey()]; !ok {
+				unsubPartitions = append(unsubPartitions, sub.GetPartition())
+			}
+		}
+	}
+	return unsubPartitions
+}
+
+// AddNewPartition append a new partition.
+func (r *RmtDataCache) AddNewPartition(newPartition *metadata.Partition) {
+	sub := &metadata.SubscribeInfo{}
+	sub.SetPartition(newPartition)
+	sub.SetConsumerID(r.consumerID)
+	sub.SetGroup(r.groupName)
+
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	partitionKey := newPartition.GetPartitionKey()
+	if partition, ok := r.partitions[partitionKey]; !ok {
+		r.partitions[partitionKey] = partition
+		if partitions, ok := r.topicPartitions[partition.GetPartitionKey()]; !ok {
+			newPartitions := make(map[string]bool)
+			newPartitions[partitionKey] = true
+			r.topicPartitions[partition.GetTopic()] = newPartitions
+		} else if _, ok := partitions[partitionKey]; !ok {
+			partitions[partitionKey] = true
+		}
+		if partitions, ok := r.brokerPartitions[partition.GetBroker()]; !ok {
+			newPartitions := make(map[string]bool)
+			newPartitions[partitionKey] = true
+			r.brokerPartitions[partition.GetBroker()] = newPartitions
+		} else if _, ok := partitions[partitionKey]; !ok {
+			partitions[partitionKey] = true
+		}
+		r.partitionSubInfo[partitionKey] = sub
+	}
+	r.resetIdlePartition(partitionKey, true)
+}
+
+// HandleExpiredPartitions handles the expired partitions.
+func (r *RmtDataCache) HandleExpiredPartitions(wait time.Duration) {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+	expired := make(map[string]bool, len(r.usedPartitions))
+	if len(r.usedPartitions) > 0 {
+		curr := time.Now().UnixNano() / int64(time.Millisecond)
+		for partition, time := range r.usedPartitions {
+			if curr-time > wait.Milliseconds() {
+				expired[partition] = true
+				if p, ok := r.partitions[partition]; ok {
+					p.SetLastConsumed(false)
+				}
+			}
+		}
+		if len(expired) > 0 {
+			for partition := range expired {
+				r.resetIdlePartition(partition, true)
+			}
+		}
+	}
+}
+
+// RemovePartition removes the given partition keys.
+func (r *RmtDataCache) RemovePartition(partitionKeys []string) {
+	r.metaMu.Lock()
+	defer r.metaMu.Unlock()
+
+	for _, partitionKey := range partitionKeys {
+		r.resetIdlePartition(partitionKey, false)
+		r.removeMetaInfo(partitionKey)
+	}
+}
+
+// IsFirstRegister returns whether the given partition is first registered.
+func (r *RmtDataCache) IsFirstRegister(partitionKey string) bool {
+	r.dataBookMu.Lock()
+	defer r.dataBookMu.Unlock()
+
+	if _, ok := r.partitionRegBooked[partitionKey]; !ok {
+		r.partitionRegBooked[partitionKey] = true
+	}
+	return r.partitionRegBooked[partitionKey]
+}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index 7f9ead2..5105fd5 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -22,11 +22,13 @@ import (
 
 	"github.com/golang/protobuf/proto"
 
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
 const (
@@ -47,14 +49,14 @@ const (
 )
 
 // RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error) {
 	reqC2B := &protocol.RegisterRequestC2B{
 		OpType:        proto.Int32(register),
 		ClientId:      proto.String(sub.GetClientID()),
 		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		TopicName:     proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
 		PartitionId:   proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
-		QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId: proto.Int32(r.GetQryPriorityID()),
 		ReadStatus:    proto.Int32(metadata.GetReadStatus()),
 		AuthInfo:      sub.GetAuthorizedInfo(),
 	}
@@ -66,7 +68,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 		}
 	}
 	offset := sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey())
-	if offset != client.InValidOffset {
+	if offset != util.InvalidValue {
 		reqC2B.CurrOffset = proto.Int64(offset)
 	}
 	data, err := proto.Marshal(reqC2B)
@@ -87,7 +89,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -101,7 +103,7 @@ func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.M
 }
 
 // UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error) {
 	reqC2B := &protocol.RegisterRequestC2B{
 		OpType:      proto.Int32(unregister),
 		ClientId:    proto.String(sub.GetClientID()),
@@ -129,7 +131,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -143,7 +145,7 @@ func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.
 }
 
 // GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
 	reqC2B := &protocol.GetMessageRequestC2B{
 		ClientId:           proto.String(sub.GetClientID()),
 		PartitionId:        proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -171,7 +173,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -185,7 +187,7 @@ func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata
 }
 
 // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
 	reqC2B := &protocol.CommitOffsetRequestC2B{
 		ClientId:         proto.String(sub.GetClientID()),
 		TopicName:        proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -206,12 +208,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(brokerConsumerHeartbeat),
+		Method:  proto.Int32(brokerConsumerCommit),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -225,12 +227,12 @@ func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metada
 }
 
 // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
 	reqC2B := &protocol.HeartBeatRequestC2B{
 		ClientId:      proto.String(sub.GetClientID()),
 		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		ReadStatus:    proto.Int32(metadata.GetReadStatus()),
-		QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId: proto.Int32(r.GetQryPriorityID()),
 		AuthInfo:      sub.GetAuthorizedInfo(),
 	}
 	partitions := r.GetPartitionByBroker(metadata.GetNode())
@@ -256,7 +258,7 @@ func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.
 		Flag: proto.Int32(0),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 71123c7..fab5bae 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -21,13 +21,14 @@ package rpc
 import (
 	"context"
 
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
 )
 
@@ -39,21 +40,21 @@ const (
 // RPCClient is the rpc level client to interact with TubeMQ.
 type RPCClient interface {
 	// RegisterRequestC2B is the rpc request for a consumer to register to a broker.
-	RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseB2C, error)
 	// UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker.
-	UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	UnregisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.RegisterResponseB2C, error)
 	// GetMessageRequestC2B is the rpc request for a consumer to get message from a broker.
-	GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+	GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
 	// CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker.
-	CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+	CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
 	// HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker.
-	HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+	HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
 	// RegisterRequestC2M is the rpc request for a consumer to register request to master.
-	RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+	RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error)
 	// HeartRequestC2M is the rpc request for a consumer to send heartbeat to master.
-	HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+	HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error)
 	// CloseRequestC2M is the rpc request for a consumer to be closed to master.
-	CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+	CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error)
 }
 
 // New returns a default TubeMQ rpc Client
@@ -70,8 +71,8 @@ type rpcClient struct {
 	config *config.Config
 }
 
-func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
-	rsp, err := c.client.DoRequest(ctx, req)
+func (c *rpcClient) doRequest(ctx context.Context, address string, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
+	rsp, err := c.client.DoRequest(ctx, address, req)
 	if err != nil {
 		return nil, errs.New(errs.RetRequestFailure, err.Error())
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 9eb336a..c0d4bc3 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -22,11 +22,12 @@ import (
 
 	"github.com/golang/protobuf/proto"
 
-	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
 )
 
 const (
@@ -39,7 +40,7 @@ const (
 )
 
 // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
 	reqC2M := &protocol.RegisterRequestC2M{
 		ClientId:         proto.String(sub.GetClientID()),
 		HostName:         proto.String(metadata.GetNode().GetHost()),
@@ -48,7 +49,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 		SessionTime:      proto.Int64(sub.GetSubscribedTime()),
 		DefFlowCheckId:   proto.Int64(r.GetDefFlowCtrlID()),
 		GroupFlowCheckId: proto.Int64(r.GetGroupFlowCtrlID()),
-		QryPriorityId:    proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId:    proto.Int32(r.GetQryPriorityID()),
 		AuthInfo:         sub.GetMasterCertificateIInfo(),
 	}
 	reqC2M.TopicList = make([]string, 0, len(sub.GetTopics()))
@@ -92,7 +93,7 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -106,14 +107,14 @@ func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.M
 }
 
 // HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo, r *remote.RmtDataCache) (*protocol.HeartResponseM2C, error) {
 	reqC2M := &protocol.HeartRequestC2M{
 		ClientId:            proto.String(sub.GetClientID()),
 		GroupName:           proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		DefFlowCheckId:      proto.Int64(r.GetDefFlowCtrlID()),
 		GroupFlowCheckId:    proto.Int64(r.GetGroupFlowCtrlID()),
 		ReportSubscribeInfo: proto.Bool(false),
-		QryPriorityId:       proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		QryPriorityId:       proto.Int32(r.GetQryPriorityID()),
 	}
 	event := r.PollEventResult()
 	if event != nil || metadata.GetReportTimes() {
@@ -159,7 +160,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
 		Flag: proto.Int32(0),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
@@ -173,7 +174,7 @@ func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Meta
 }
 
 // CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *sub.SubInfo) (*protocol.CloseResponseM2C, error) {
 	reqC2M := &protocol.CloseRequestC2M{
 		ClientId:  proto.String(sub.GetClientID()),
 		GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -197,7 +198,7 @@ func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Meta
 		Flag: proto.Int32(0),
 	}
 
-	rspBody, err := c.doRequest(ctx, req)
+	rspBody, err := c.doRequest(ctx, metadata.GetNode().GetAddress(), req)
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/client/info.go b/tubemq-client-twins/tubemq-client-go/sub/info.go
similarity index 62%
rename from tubemq-client-twins/tubemq-client-go/client/info.go
rename to tubemq-client-twins/tubemq-client-go/sub/info.go
index 25a2686..737b08a 100644
--- a/tubemq-client-twins/tubemq-client-go/client/info.go
+++ b/tubemq-client-twins/tubemq-client-go/sub/info.go
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-// Package client defines the api and information
-// which can be exposed to user.
-package client
+// package sub defines the subscription information of a client.
+package sub
 
 import (
+	"strconv"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
 )
 
 // InvalidOffset represents the offset which is invalid.
-const InValidOffset = -1
+const InValidOffset = -2
 
 // SubInfo represents the sub information of the client.
 type SubInfo struct {
@@ -46,6 +49,46 @@ type SubInfo struct {
 	masterCertificateInfo *protocol.MasterCertificateInfo
 }
 
+// NewSubInfo parses the subscription from the config to SubInfo.
+func NewSubInfo(config *config.Config) *SubInfo {
+	s := &SubInfo{
+		boundConsume:    config.Consumer.BoundConsume,
+		subscribedTime:  time.Now().UnixNano() / int64(time.Millisecond),
+		firstRegistered: false,
+		topics:          config.Consumer.Topics,
+		topicFilters:    config.Consumer.TopicFilters,
+	}
+	s.topicConds = make([]string, 0, len(config.Consumer.TopicFilters))
+	for topic, filters := range config.Consumer.TopicFilters {
+		cond := topic + "#"
+		count := 0
+		for _, filter := range filters {
+			if count > 0 {
+				cond += ","
+			}
+			cond += filter
+		}
+		s.topicConds = append(s.topicConds, cond)
+	}
+	if config.Consumer.BoundConsume {
+		s.sessionKey = config.Consumer.SessionKey
+		s.sourceCount = int32(config.Consumer.SourceCount)
+		s.selectBig = config.Consumer.SelectBig
+		assignedPartitions := config.Consumer.PartitionOffset
+		count := 0
+		for partition, offset := range assignedPartitions {
+			if count > 0 {
+				s.boundPartitions += ","
+			}
+			s.boundPartitions += partition
+			s.boundPartitions += "="
+			s.boundPartitions += strconv.Itoa(int(offset))
+			count++
+		}
+	}
+	return s
+}
+
 // GetClientID returns the client ID.
 func (s *SubInfo) GetClientID() string {
 	return s.clientID
@@ -124,6 +167,32 @@ func (s *SubInfo) GetAuthorizedInfo() *protocol.AuthorizedInfo {
 	return s.authInfo
 }
 
+// GetMasterCertifateInfo returns the masterCertificateInfo.
 func (s *SubInfo) GetMasterCertificateIInfo() *protocol.MasterCertificateInfo {
 	return s.masterCertificateInfo
 }
+
+// FirstRegistered sets the firstRegistered to true.
+func (s *SubInfo) FirstRegistered() {
+	s.firstRegistered = true
+}
+
+// SetAuthorizedInfo sets the authorizedInfo.
+func (s *SubInfo) SetAuthorizedInfo(auth *protocol.AuthorizedInfo) {
+	s.authInfo = auth
+}
+
+// SetMasterCertificateInfo sets the masterCertificateInfo.
+func (s *SubInfo) SetMasterCertificateInfo(info *protocol.MasterCertificateInfo) {
+	s.masterCertificateInfo = info
+}
+
+// SetIsNotAllocated sets the notAllocated.
+func (s *SubInfo) SetIsNotAllocated(isNotAllocated bool) {
+	s.notAllocated = isNotAllocated
+}
+
+// SetClientID sets the clientID.
+func (s *SubInfo) SetClientID(clientID string) {
+	s.clientID = clientID
+}
diff --git a/tubemq-client-twins/tubemq-client-go/transport/client.go b/tubemq-client-twins/tubemq-client-go/transport/client.go
index d816a7f..91c9692 100644
--- a/tubemq-client-twins/tubemq-client-go/transport/client.go
+++ b/tubemq-client-twins/tubemq-client-go/transport/client.go
@@ -28,8 +28,6 @@ import (
 
 // Options represents the transport options
 type Options struct {
-	Address string
-
 	CACertFile    string
 	TLSCertFile   string
 	TLSKeyFile    string
@@ -51,9 +49,9 @@ func New(opts *Options, pool *multiplexing.Pool) *Client {
 }
 
 // DoRequest sends the request and return the decoded response
-func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPCResponse, error) {
+func (c *Client) DoRequest(ctx context.Context, address string, req codec.RPCRequest) (codec.RPCResponse, error) {
 	opts := &multiplexing.DialOptions{
-		Address: c.opts.Address,
+		Address: address,
 		Network: "tcp",
 	}
 	if c.opts.CACertFile != "" {
@@ -63,7 +61,7 @@ func (c *Client) DoRequest(ctx context.Context, req codec.RPCRequest) (codec.RPC
 		opts.TLSServerName = c.opts.TLSServerName
 	}
 
-	conn, err := c.pool.Get(ctx, c.opts.Address, req.GetSerialNo(), opts)
+	conn, err := c.pool.Get(ctx, address, req.GetSerialNo(), opts)
 	if err != nil {
 		return nil, err
 	}
diff --git a/tubemq-client-twins/tubemq-client-go/util/util.go b/tubemq-client-twins/tubemq-client-go/util/util.go
new file mode 100644
index 0000000..77422ac
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/util/util.go
@@ -0,0 +1,52 @@
+package util
+
+import (
+	"net"
+)
+
+var InvalidValue = int64(-2)
+
+func GetLocalHost() string {
+	ifaces, err := net.Interfaces()
+	if err != nil {
+		return ""
+	}
+	for _, iface := range ifaces {
+		if iface.Flags&net.FlagUp == 0 {
+			continue // interface down
+		}
+		if iface.Flags&net.FlagLoopback != 0 {
+			continue // loopback interface
+		}
+		addrs, err := iface.Addrs()
+		if err != nil {
+			return ""
+		}
+		for _, addr := range addrs {
+			var ip net.IP
+			switch v := addr.(type) {
+			case *net.IPNet:
+				ip = v.IP
+			case *net.IPAddr:
+				ip = v.IP
+			}
+			if ip == nil || ip.IsLoopback() {
+				continue
+			}
+			ip = ip.To4()
+			if ip == nil {
+				continue // not an ipv4 address
+			}
+			return ip.String()
+		}
+	}
+	return ""
+}
+
+func GenBrokerAuthenticateToken(username string, password string) string {
+	return ""
+}
+
+func GenMasterAuthenticateToken(username string, password string) string {
+	return ""
+}