You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/06/03 08:07:22 UTC
[GitHub] [incubator-inlong] TszKitLo40 opened a new pull request #480: [INLONG-624]Go SDK consumer interface
TszKitLo40 opened a new pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480
Signed-off-by: Zijie Lu <ws...@gmail.com>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (294924e) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.03%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.48% -0.04%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2209 -10
- Misses 26808 26816 +8
- Partials 473 475 +2
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvbm5lY3RvcnMvdHViZW1xLWNvbm5lY3Rvci1mbHVtZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZmx1bWUvc2luay90dWJlbXEvVHViZW1xU2luay5qYXZh) | `51.42% <0.00%> (-4.00%)` | :arrow_down: |
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...294924e](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter commented on pull request #480: [INLONG-624]Go SDK consumer interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d3d07ce) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.51% -0.02%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2216 -3
- Misses 26808 26810 +2
- Partials 473 474 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...d3d07ce](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6752399) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **not change** coverage.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
===========================================
Coverage 7.52% 7.52%
Complexity 481 481
===========================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
===========================================
Hits 2219 2219
Misses 26808 26808
Partials 473 473
```
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...6752399](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] charlely commented on a change in pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#discussion_r647076775
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer.go
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+const (
+ tubeMQClientVersion = "0.1.0"
Review comment:
Need a separate file to define version
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+ consumeStatusNormal = 0
+ consumeStatusFromMax = 1
+ consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+ clientID string
+ config *config.Config
+ subInfo *sub.SubInfo
+ rmtDataCache *remote.RmtDataCache
+ visitToken int64
+ authorizedInfo string
+ nextAuth2Master int32
+ nextAuth2Broker int32
+ master *selector.Node
+ client rpc.RPCClient
+ selector selector.Selector
+ lastMasterHb int64
+ masterHBRetry int
+ heartbeatManager *heartbeatManager
+ unreportedTimes int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+ selector, err := selector.Get("ip")
+ if err != nil {
+ return nil, err
+ }
+
+ clientID := newClientID(config.Consumer.Group)
+ pool := multiplexing.NewPool()
+ opts := &transport.Options{}
+ if config.Net.TLS.Enable {
+ opts.CACertFile = config.Net.TLS.CACertFile
+ opts.TLSCertFile = config.Net.TLS.TLSCertFile
+ opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+ opts.TLSServerName = config.Net.TLS.TLSServerName
+ }
+ client := rpc.New(pool, opts)
+ r := remote.NewRmtDataCache()
+ r.SetConsumerInfo(clientID, config.Consumer.Group)
+ c := &consumer{
+ config: config,
+ clientID: clientID,
+ subInfo: sub.NewSubInfo(config),
+ rmtDataCache: r,
+ selector: selector,
+ client: client,
+ visitToken: util.InvalidValue,
+ unreportedTimes: 0,
+ }
+ c.subInfo.SetClientID(clientID)
+ hbm := newHBManager(c)
+ c.heartbeatManager = hbm
+ return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+ err := c.register2Master(false)
+ if err != nil {
+ return err
+ }
+ c.heartbeatManager.registerMaster(c.master.Address)
+ go c.processRebalanceEvent()
+ return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+ if needChange {
+ node, err := c.selector.Select(c.config.Consumer.Masters)
+ if err != nil {
+ return err
+ }
+ c.master = node
+ }
+ for c.master.HasNext {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+
+ rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ if err != nil {
+ cancel()
+ return err
+ }
+ if rsp.GetSuccess() {
+ c.masterHBRetry = 0
+ c.processRegisterResponseM2C(rsp)
+ cancel()
+ return nil
+ } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ cancel()
+ return nil
+ } else {
+ c.master, err = c.selector.Select(c.config.Consumer.Masters)
+ cancel()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+ if rsp.GetNotAllocated() {
+ c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ }
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+ c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+ atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+ c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+ panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+ for {
+ event := c.rmtDataCache.TakeEvent()
+ if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+ break
+ }
+ c.rmtDataCache.ClearEvent()
+ switch event.GetEventType() {
+ case 2, 20:
+ c.disconnect2Broker(event)
+ c.rmtDataCache.OfferEventResult(event)
+ case 1, 10:
Review comment:
const value cann't use magic number.
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+ consumeStatusNormal = 0
+ consumeStatusFromMax = 1
+ consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+ clientID string
+ config *config.Config
+ subInfo *sub.SubInfo
+ rmtDataCache *remote.RmtDataCache
+ visitToken int64
+ authorizedInfo string
+ nextAuth2Master int32
+ nextAuth2Broker int32
+ master *selector.Node
+ client rpc.RPCClient
+ selector selector.Selector
+ lastMasterHb int64
+ masterHBRetry int
+ heartbeatManager *heartbeatManager
+ unreportedTimes int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+ selector, err := selector.Get("ip")
+ if err != nil {
+ return nil, err
+ }
+
+ clientID := newClientID(config.Consumer.Group)
+ pool := multiplexing.NewPool()
+ opts := &transport.Options{}
+ if config.Net.TLS.Enable {
+ opts.CACertFile = config.Net.TLS.CACertFile
+ opts.TLSCertFile = config.Net.TLS.TLSCertFile
+ opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+ opts.TLSServerName = config.Net.TLS.TLSServerName
+ }
+ client := rpc.New(pool, opts)
+ r := remote.NewRmtDataCache()
+ r.SetConsumerInfo(clientID, config.Consumer.Group)
+ c := &consumer{
+ config: config,
+ clientID: clientID,
+ subInfo: sub.NewSubInfo(config),
+ rmtDataCache: r,
+ selector: selector,
+ client: client,
+ visitToken: util.InvalidValue,
+ unreportedTimes: 0,
+ }
+ c.subInfo.SetClientID(clientID)
+ hbm := newHBManager(c)
+ c.heartbeatManager = hbm
+ return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+ err := c.register2Master(false)
+ if err != nil {
+ return err
+ }
+ c.heartbeatManager.registerMaster(c.master.Address)
+ go c.processRebalanceEvent()
+ return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+ if needChange {
+ node, err := c.selector.Select(c.config.Consumer.Masters)
+ if err != nil {
+ return err
+ }
+ c.master = node
+ }
+ for c.master.HasNext {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+
+ rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ if err != nil {
+ cancel()
+ return err
+ }
+ if rsp.GetSuccess() {
+ c.masterHBRetry = 0
+ c.processRegisterResponseM2C(rsp)
+ cancel()
+ return nil
+ } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ cancel()
+ return nil
+ } else {
+ c.master, err = c.selector.Select(c.config.Consumer.Masters)
+ cancel()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+ if rsp.GetNotAllocated() {
+ c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ }
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+ c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+ atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+ c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+ panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+ for {
Review comment:
Why not use channels
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer.go
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package client defines the api and information
+// which can be exposed to user.
+package client
+
+const (
+ tubeMQClientVersion = "0.1.0"
+)
+
+// ConsumerResult of a consumption.
+type ConsumerResult struct {
+}
+
+// ConsumerOffset of a consumption,
+type ConsumerOffset struct {
+}
+
+var clientIndex uint64
Review comment:
This should be the allocator of clientid, index is more like array coordinates.
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+ consumeStatusNormal = 0
+ consumeStatusFromMax = 1
+ consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+ clientID string
+ config *config.Config
+ subInfo *sub.SubInfo
+ rmtDataCache *remote.RmtDataCache
+ visitToken int64
+ authorizedInfo string
+ nextAuth2Master int32
+ nextAuth2Broker int32
+ master *selector.Node
+ client rpc.RPCClient
+ selector selector.Selector
+ lastMasterHb int64
+ masterHBRetry int
+ heartbeatManager *heartbeatManager
+ unreportedTimes int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+ selector, err := selector.Get("ip")
+ if err != nil {
+ return nil, err
+ }
+
+ clientID := newClientID(config.Consumer.Group)
+ pool := multiplexing.NewPool()
+ opts := &transport.Options{}
+ if config.Net.TLS.Enable {
+ opts.CACertFile = config.Net.TLS.CACertFile
+ opts.TLSCertFile = config.Net.TLS.TLSCertFile
+ opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+ opts.TLSServerName = config.Net.TLS.TLSServerName
+ }
+ client := rpc.New(pool, opts)
+ r := remote.NewRmtDataCache()
+ r.SetConsumerInfo(clientID, config.Consumer.Group)
+ c := &consumer{
+ config: config,
+ clientID: clientID,
+ subInfo: sub.NewSubInfo(config),
+ rmtDataCache: r,
+ selector: selector,
+ client: client,
+ visitToken: util.InvalidValue,
+ unreportedTimes: 0,
+ }
+ c.subInfo.SetClientID(clientID)
+ hbm := newHBManager(c)
+ c.heartbeatManager = hbm
+ return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+ err := c.register2Master(false)
+ if err != nil {
+ return err
+ }
+ c.heartbeatManager.registerMaster(c.master.Address)
+ go c.processRebalanceEvent()
+ return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+ if needChange {
+ node, err := c.selector.Select(c.config.Consumer.Masters)
+ if err != nil {
+ return err
+ }
+ c.master = node
+ }
+ for c.master.HasNext {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
Review comment:
Need use defer cancel(), can define other function to register one master.
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+ consumeStatusNormal = 0
+ consumeStatusFromMax = 1
+ consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+ clientID string
+ config *config.Config
+ subInfo *sub.SubInfo
+ rmtDataCache *remote.RmtDataCache
+ visitToken int64
+ authorizedInfo string
+ nextAuth2Master int32
+ nextAuth2Broker int32
+ master *selector.Node
+ client rpc.RPCClient
+ selector selector.Selector
+ lastMasterHb int64
+ masterHBRetry int
+ heartbeatManager *heartbeatManager
+ unreportedTimes int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+ selector, err := selector.Get("ip")
+ if err != nil {
+ return nil, err
+ }
+
+ clientID := newClientID(config.Consumer.Group)
+ pool := multiplexing.NewPool()
+ opts := &transport.Options{}
+ if config.Net.TLS.Enable {
+ opts.CACertFile = config.Net.TLS.CACertFile
+ opts.TLSCertFile = config.Net.TLS.TLSCertFile
+ opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+ opts.TLSServerName = config.Net.TLS.TLSServerName
+ }
+ client := rpc.New(pool, opts)
+ r := remote.NewRmtDataCache()
+ r.SetConsumerInfo(clientID, config.Consumer.Group)
+ c := &consumer{
+ config: config,
+ clientID: clientID,
+ subInfo: sub.NewSubInfo(config),
+ rmtDataCache: r,
+ selector: selector,
+ client: client,
+ visitToken: util.InvalidValue,
+ unreportedTimes: 0,
+ }
+ c.subInfo.SetClientID(clientID)
+ hbm := newHBManager(c)
+ c.heartbeatManager = hbm
+ return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+ err := c.register2Master(false)
+ if err != nil {
+ return err
+ }
+ c.heartbeatManager.registerMaster(c.master.Address)
+ go c.processRebalanceEvent()
+ return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+ if needChange {
+ node, err := c.selector.Select(c.config.Consumer.Masters)
+ if err != nil {
+ return err
+ }
+ c.master = node
+ }
+ for c.master.HasNext {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+
+ rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ if err != nil {
+ cancel()
+ return err
+ }
+ if rsp.GetSuccess() {
+ c.masterHBRetry = 0
+ c.processRegisterResponseM2C(rsp)
+ cancel()
+ return nil
+ } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ cancel()
+ return nil
+ } else {
+ c.master, err = c.selector.Select(c.config.Consumer.Masters)
+ cancel()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+ if rsp.GetNotAllocated() {
+ c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ }
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+ c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+ atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+ c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+ panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+ for {
Review comment:
The for loop will run to full cpu
##########
File path: tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
##########
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package client
+
+import (
+ "context"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/remote"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/rpc"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/selector"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/sub"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
+)
+
+const (
+ consumeStatusNormal = 0
+ consumeStatusFromMax = 1
+ consumeStatusFromMaxAlways = 2
+)
+
+type consumer struct {
+ clientID string
+ config *config.Config
+ subInfo *sub.SubInfo
+ rmtDataCache *remote.RmtDataCache
+ visitToken int64
+ authorizedInfo string
+ nextAuth2Master int32
+ nextAuth2Broker int32
+ master *selector.Node
+ client rpc.RPCClient
+ selector selector.Selector
+ lastMasterHb int64
+ masterHBRetry int
+ heartbeatManager *heartbeatManager
+ unreportedTimes int
+}
+
+// NewConsumer returns a consumer which is constructed by a given config.
+func NewConsumer(config *config.Config) (Consumer, error) {
+ selector, err := selector.Get("ip")
+ if err != nil {
+ return nil, err
+ }
+
+ clientID := newClientID(config.Consumer.Group)
+ pool := multiplexing.NewPool()
+ opts := &transport.Options{}
+ if config.Net.TLS.Enable {
+ opts.CACertFile = config.Net.TLS.CACertFile
+ opts.TLSCertFile = config.Net.TLS.TLSCertFile
+ opts.TLSKeyFile = config.Net.TLS.TLSKeyFile
+ opts.TLSServerName = config.Net.TLS.TLSServerName
+ }
+ client := rpc.New(pool, opts)
+ r := remote.NewRmtDataCache()
+ r.SetConsumerInfo(clientID, config.Consumer.Group)
+ c := &consumer{
+ config: config,
+ clientID: clientID,
+ subInfo: sub.NewSubInfo(config),
+ rmtDataCache: r,
+ selector: selector,
+ client: client,
+ visitToken: util.InvalidValue,
+ unreportedTimes: 0,
+ }
+ c.subInfo.SetClientID(clientID)
+ hbm := newHBManager(c)
+ c.heartbeatManager = hbm
+ return c, nil
+}
+
+// Start implementation of tubeMQ consumer.
+func (c *consumer) Start() error {
+ err := c.register2Master(false)
+ if err != nil {
+ return err
+ }
+ c.heartbeatManager.registerMaster(c.master.Address)
+ go c.processRebalanceEvent()
+ return nil
+}
+
+func (c *consumer) register2Master(needChange bool) error {
+ if needChange {
+ node, err := c.selector.Select(c.config.Consumer.Masters)
+ if err != nil {
+ return err
+ }
+ c.master = node
+ }
+ for c.master.HasNext {
+ ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
+
+ m := &metadata.Metadata{}
+ node := &metadata.Node{}
+ node.SetHost(util.GetLocalHost())
+ node.SetAddress(c.master.Address)
+ m.SetNode(node)
+ sub := &metadata.SubscribeInfo{}
+ sub.SetGroup(c.config.Consumer.Group)
+ m.SetSubscribeInfo(sub)
+
+ auth := &protocol.AuthenticateInfo{}
+ c.genMasterAuthenticateToken(auth, true)
+ mci := &protocol.MasterCertificateInfo{
+ AuthInfo: auth,
+ }
+ c.subInfo.SetMasterCertificateInfo(mci)
+
+ rsp, err := c.client.RegisterRequestC2M(ctx, m, c.subInfo, c.rmtDataCache)
+ if err != nil {
+ cancel()
+ return err
+ }
+ if rsp.GetSuccess() {
+ c.masterHBRetry = 0
+ c.processRegisterResponseM2C(rsp)
+ cancel()
+ return nil
+ } else if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ cancel()
+ return nil
+ } else {
+ c.master, err = c.selector.Select(c.config.Consumer.Masters)
+ cancel()
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (c *consumer) processRegisterResponseM2C(rsp *protocol.RegisterResponseM2C) {
+ if rsp.GetNotAllocated() {
+ c.subInfo.SetIsNotAllocated(rsp.GetNotAllocated())
+ }
+ if rsp.GetDefFlowCheckId() != 0 || rsp.GetDefFlowCheckId() != 0 {
+ if rsp.GetDefFlowCheckId() != 0 {
+ c.rmtDataCache.UpdateDefFlowCtrlInfo(rsp.GetDefFlowCheckId(), rsp.GetDefFlowControlInfo())
+ }
+ qryPriorityID := c.rmtDataCache.GetQryPriorityID()
+ if rsp.GetQryPriorityId() != 0 {
+ qryPriorityID = rsp.GetQryPriorityId()
+ }
+ c.rmtDataCache.UpdateGroupFlowCtrlInfo(qryPriorityID, rsp.GetGroupFlowCheckId(), rsp.GetGroupFlowControlInfo())
+ }
+ if rsp.GetAuthorizedInfo() != nil {
+ c.processAuthorizedToken(rsp.GetAuthorizedInfo())
+ }
+ c.lastMasterHb = time.Now().UnixNano() / int64(time.Millisecond)
+}
+
+func (c *consumer) processAuthorizedToken(info *protocol.MasterAuthorizedInfo) {
+ atomic.StoreInt64(&c.visitToken, info.GetVisitAuthorizedToken())
+ c.authorizedInfo = info.GetAuthAuthorizedToken()
+}
+
+// GetMessage implementation of TubeMQ consumer.
+func (c *consumer) GetMessage() (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// Confirm implementation of TubeMQ consumer.
+func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResult, error) {
+ panic("implement me")
+}
+
+// GetCurrConsumedInfo implementation of TubeMQ consumer.
+func (c *consumer) GetCurrConsumedInfo() (map[string]*ConsumerOffset, error) {
+ panic("implement me")
+}
+
+func (c *consumer) processRebalanceEvent() {
+ for {
+ event := c.rmtDataCache.TakeEvent()
+ if event.GetEventStatus() == int32(util.InvalidValue) && event.GetRebalanceID() == util.InvalidValue {
+ break
+ }
+ c.rmtDataCache.ClearEvent()
+ switch event.GetEventType() {
+ case 2, 20:
Review comment:
const value cann't use magic number.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b0bd1a5) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.51% -0.02%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2216 -3
- Misses 26808 26810 +2
- Partials 473 474 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...b0bd1a5](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] gosonzhang merged pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
gosonzhang merged pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (39a681b) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
> :exclamation: Current head 39a681b differs from pull request most recent head 294924e. Consider uploading reports for the commit 294924e to get more accurate results
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.51% -0.02%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2216 -3
- Misses 26808 26810 +2
- Partials 473 474 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...294924e](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a161297) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.51% -0.02%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2216 -3
- Misses 26808 26810 +2
- Partials 473 474 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...a161297](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (74d4a1d) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.50% -0.02%
+ Complexity 481 478 -3
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2215 -4
- Misses 26808 26813 +5
+ Partials 473 472 -1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
| [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.31% <0.00%> (-0.40%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...74d4a1d](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #480: [INLONG-624]Go SDK consumer start interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (b0bd1a5) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.51% -0.02%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2216 -3
- Misses 26808 26810 +2
- Partials 473 474 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...b0bd1a5](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-inlong] codecov-commenter commented on pull request #480: [INLONG-624]Go SDK consumer interface
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #480:
URL: https://github.com/apache/incubator-inlong/pull/480#issuecomment-853744479
# [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#480](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d3d07ce) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/6fd764444c824cae9595a5551c2d7a7443a3e4e6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6fd7644) will **decrease** coverage by `0.01%`.
> The diff coverage is `n/a`.
[![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/480/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
```diff
@@ Coverage Diff @@
## INLONG-25 #480 +/- ##
==============================================
- Coverage 7.52% 7.51% -0.02%
+ Complexity 481 479 -2
==============================================
Files 267 267
Lines 29500 29500
Branches 4843 4843
==============================================
- Hits 2219 2216 -3
- Misses 26808 26810 +2
- Partials 473 474 +1
```
| [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | :arrow_down: |
| [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/480/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | :arrow_down: |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6fd7644...d3d07ce](https://codecov.io/gh/apache/incubator-inlong/pull/480?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org