You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/06/08 03:24:19 UTC

[GitHub] [incubator-inlong] charlely commented on a change in pull request #480: [INLONG-624]Go SDK consumer start interface

charlely commented on a change in pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#discussion_r647076775



##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer.go
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+const (
+	tubeMQClientVersion = "0.1.0"

Review comment:
       Need a separate file to define version

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {
+		event := c.rmtDataCache.TakeEvent()
+		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+			break
+		}
+		c.rmtDataCache.ClearEvent()
+		switch event.GetEventType() {
+		case 2, 20:
+			c.disconnect2Broker(event)
+			c.rmtDataCache.OfferEventResult(event)
+		case 1, 10:

Review comment:
       const value cann't use magic number.

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {

Review comment:
       Why not use channels

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer.go
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+const (
+	tubeMQClientVersion = "0.1.0"
+)
+
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
+}
+
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
+}
+
+var clientIndex uint64

Review comment:
       This should be the allocator of clientid, index is more like array coordinates.

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)

Review comment:
       Need use defer cancel(), can define other function to register one master.

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {

Review comment:
       The for loop will run to full cpu

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {
+		event := c.rmtDataCache.TakeEvent()
+		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+			break
+		}
+		c.rmtDataCache.ClearEvent()
+		switch event.GetEventType() {
+		case 2, 20:

Review comment:
       const value cann't use magic number.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org