You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/06/03 08:07:22 UTC

[GitHub] [incubator-inlong] TszKitLo40 opened a new pull request #480: [INLONG-624]Go SDK consumer interface

TszKitLo40 opened a new pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480


   Signed-off-by: Zijie Lu <ws...@gmail.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (294924e) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.03%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.48%   -0.04%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2209      -10     
   - Misses           26808   26816       +8     
   - Partials           473     475       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvbm5lY3RvcnMvdHViZW1xLWNvbm5lY3Rvci1mbHVtZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZmx1bWUvc2luay90dWJlbXEvVHViZW1xU2luay5qYXZh) | `51.42% <0.00%> (-4.00%)` | :arrow_down: |
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...294924e](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #480: [INLONG-624]Go SDK consumer interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d3d07ce) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...d3d07ce](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6752399) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **not change** coverage.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff             @@
   ##             INLONG-25    #480   +/-   ##
   ===========================================
     Coverage         7.52%   7.52%           
     Complexity         481     481           
   ===========================================
     Files              267     267           
     Lines            29500   29500           
     Branches          4843    4843           
   ===========================================
     Hits              2219    2219           
     Misses           26808   26808           
     Partials           473     473           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...6752399](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#discussion_r647076775



##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer.go
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+const (
+	tubeMQClientVersion = "0.1.0"

Review comment:
       Need a separate file to define version

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {
+		event := c.rmtDataCache.TakeEvent()
+		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+			break
+		}
+		c.rmtDataCache.ClearEvent()
+		switch event.GetEventType() {
+		case 2, 20:
+			c.disconnect2Broker(event)
+			c.rmtDataCache.OfferEventResult(event)
+		case 1, 10:

Review comment:
       const value cann't use magic number.

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {

Review comment:
       Why not use channels

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer.go
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+const (
+	tubeMQClientVersion = "0.1.0"
+)
+
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
+}
+
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
+}
+
+var clientIndex uint64

Review comment:
       This should be the allocator of clientid, index is more like array coordinates.

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)

Review comment:
       Need use defer cancel(), can define other function to register one master.

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {

Review comment:
       The for loop will run to full cpu

##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+	"context"
+	"os"
+	"strconv"
+	"sync/atomic"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+	consumeStatusNormal        = 0
+	consumeStatusFromMax       = 1
+	consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+	clientID         string
+	config           *config.Config
+	subInfo          *sub.SubInfo
+	rmtDataCache     *remote.RmtDataCache
+	visitToken       int64
+	authorizedInfo   string
+	nextAuth2Master  int32
+	nextAuth2Broker  int32
+	master           *selector.Node
+	client           rpc.RPCClient
+	selector         selector.Selector
+	lastMasterHb     int64
+	masterHBRetry    int
+	heartbeatManager *heartbeatManager
+	unreportedTimes  int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+	selector, err := selector.Get("ip")
+	if err != nil {
+		return nil, err
+	}
+
+	clientID := newClientID(config.Consumer.Group)
+	pool := multiplexing.NewPool()
+	opts := &transport.Options{}
+	if config.Net.TLS.Enable {
+		opts.CACertFile = config.Net.TLS.CACertFile
+		opts.TLSCertFile = config.Net.TLS.TLSCertFile
+		opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+		opts.TLSServerName = config.Net.TLS.TLSServerName
+	}
+	client := rpc.New(pool, opts)
+	r := remote.NewRmtDataCache()
+	r.SetConsumerInfo(clientID, config.Consumer.Group)
+	c := &consumer{
+		config:          config,
+		clientID:        clientID,
+		subInfo:         sub.NewSubInfo(config),
+		rmtDataCache:    r,
+		selector:        selector,
+		client:          client,
+		visitToken:      util.InvalidValue,
+		unreportedTimes: 0,
+	}
+	c.subInfo.SetClientID(clientID)
+	hbm := newHBManager(c)
+	c.heartbeatManager = hbm
+	return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+	err := c.register2Master(false)
+	if err != nil {
+		return err
+	}
+	c.heartbeatManager.registerMaster(c.master.Address)
+	go c.processRebalanceEvent()
+	return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+	if needChange {
+		node, err := c.selector.Select(c.config.Consumer.Masters)
+		if err != nil {
+			return err
+		}
+		c.master = node
+	}
+	for c.master.HasNext {
+		ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+		m := &metadata.Metadata{}
+		node := &metadata.Node{}
+		node.SetHost(util.GetLocalHost())
+		node.SetAddress(c.master.Address)
+		m.SetNode(node)
+		sub := &metadata.SubscribeInfo{}
+		sub.SetGroup(c.config.Consumer.Group)
+		m.SetSubscribeInfo(sub)
+
+		auth := &protocol.AuthenticateInfo{}
+		c.genMasterAuthenticateToken(auth, true)
+		mci := &protocol.MasterCertificateInfo{
+			AuthInfo: auth,
+		}
+		c.subInfo.SetMasterCertificateInfo(mci)
+
+		rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+		if err != nil {
+			cancel()
+			return err
+		}
+		if rsp.GetSuccess() {
+			c.masterHBRetry = 0
+			c.processRegisterResponseM2C(rsp)
+			cancel()
+			return nil
+		} else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+			cancel()
+			return nil
+		} else {
+			c.master, err = c.selector.Select(c.config.Consumer.Masters)
+			cancel()
+			if err != nil {
+				return err
+			}
+		}
+	}
+	return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+	if rsp.GetNotAllocated() {
+		c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+	}
+	if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+		if rsp.GetDefFlowCheckId() != 0 {
+			c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+		}
+		qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+		if rsp.GetQryPriorityId() != 0 {
+			qryPriorityID = rsp.GetQryPriorityId()
+		}
+		c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+	}
+	if rsp.GetAuthorizedInfo() != nil {
+		c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+	}
+	c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+	atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+	c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+	panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+	panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+	for {
+		event := c.rmtDataCache.TakeEvent()
+		if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+			break
+		}
+		c.rmtDataCache.ClearEvent()
+		switch event.GetEventType() {
+		case 2, 20:

Review comment:
       const value cann't use magic number.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b0bd1a5) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...b0bd1a5](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] gosonzhang merged pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
gosonzhang merged pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (39a681b) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head 39a681b differs from pull request most recent head 294924e. Consider uploading reports for the commit 294924e to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...294924e](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a161297) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...a161297](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (74d4a1d) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.50%   -0.02%     
   + Complexity         481     478       -3     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2215       -4     
   - Misses           26808   26813       +5     
   + Partials           473     472       -1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.31% <0.00%> (-0.40%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...74d4a1d](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b0bd1a5) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...b0bd1a5](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #480: [INLONG-624]Go SDK consumer interface

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d3d07ce) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #480      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...d3d07ce](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org