You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/15 05:07:21 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-1555]Record
the consumer config to the log (#1557)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 4b372a2 [INLONG-1555]Record the consumer config to the log (#1557)
4b372a2 is described below
commit 4b372a23fd76ffdc5f2d47df0d2fc90b3285c210
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed Sep 15 13:07:18 2021 +0800
[INLONG-1555]Record the consumer config to the log (#1557)
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 3 ++-
tubemq-client-twins/tubemq-client-go/config/config.go | 9 +++++++++
tubemq-client-twins/tubemq-client-go/remote/remote.go | 4 ++--
3 files changed, 13 insertions(+), 3 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index f8ef878..db56472 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -72,6 +72,7 @@ type consumer struct {
// NewConsumer returns a consumer which is constructed by a given config.
func NewConsumer(config *config.Config) (Consumer, error) {
+ log.Infof("The config of the consumer is %s", config)
selector, err := selector.Get("ip")
if err != nil {
return nil, err
@@ -148,7 +149,7 @@ func (c *consumer) register2Master(needChange bool) error {
}
continue
}
- log.Info("register2Master response %s", rsp.String())
+ log.Infof("register2Master response %s", rsp.String())
c.masterHBRetry = 0
c.processRegisterResponseM2C(rsp)
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index a807b6b..30079fe 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -19,6 +19,7 @@
package config
import (
+ "encoding/json"
"fmt"
"net/url"
"strconv"
@@ -148,6 +149,14 @@ func NewDefaultConfig() *Config {
return c
}
+func (c *Config) String() string {
+ bytes, err := json.Marshal(c)
+ if err != nil {
+ return err.Error()
+ }
+ return string(bytes)
+}
+
// ParseAddress parses the address to user-defined config.
func ParseAddress(address string) (config *Config, err error) {
c := NewDefaultConfig()
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 01d96ca..b7c65d5 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -372,7 +372,7 @@ func (r *RmtDataCache) GetCurConsumeStatus() int32 {
return errs.RetErrNoPartAssigned
}
if len(r.indexPartitions) == 0 {
- if len(r.usedPartitions) == 0 {
+ if len(r.usedPartitions) > 0 {
return errs.RetErrAllPartInUse
} else {
return errs.RetErrAllPartWaiting
@@ -391,7 +391,7 @@ func (r *RmtDataCache) SelectPartition() (*metadata.Partition, int64, error) {
return nil, 0, errs.ErrNoPartAssigned
} else {
if len(r.indexPartitions) == 0 {
- if len(r.usedPartitions) == 0 {
+ if len(r.usedPartitions) > 0 {
return nil, 0, errs.ErrAllPartInUse
} else {
return nil, 0, errs.ErrAllPartWaiting