You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/09/15 05:07:21 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-1555]Record the consumer config to the log (#1557)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 4b372a2  [INLONG-1555]Record the consumer config to the log (#1557)
4b372a2 is described below

commit 4b372a23fd76ffdc5f2d47df0d2fc90b3285c210
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Wed Sep 15 13:07:18 2021 +0800

    [INLONG-1555]Record the consumer config to the log (#1557)
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/client/consumer_impl.go | 3 ++-
 tubemq-client-twins/tubemq-client-go/config/config.go        | 9 +++++++++
 tubemq-client-twins/tubemq-client-go/remote/remote.go        | 4 ++--
 3 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index f8ef878..db56472 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -72,6 +72,7 @@ type consumer struct {
 
 // NewConsumer returns a consumer which is constructed by a given config.
 func NewConsumer(config *config.Config) (Consumer, error) {
+	log.Infof("The config of the consumer is %s", config)
 	selector, err := selector.Get("ip")
 	if err != nil {
 		return nil, err
@@ -148,7 +149,7 @@ func (c *consumer) register2Master(needChange bool) error {
 			}
 			continue
 		}
-		log.Info("register2Master response %s", rsp.String())
+		log.Infof("register2Master response %s", rsp.String())
 
 		c.masterHBRetry = 0
 		c.processRegisterResponseM2C(rsp)
diff --git a/tubemq-client-twins/tubemq-client-go/config/config.go b/tubemq-client-twins/tubemq-client-go/config/config.go
index a807b6b..30079fe 100644
--- a/tubemq-client-twins/tubemq-client-go/config/config.go
+++ b/tubemq-client-twins/tubemq-client-go/config/config.go
@@ -19,6 +19,7 @@
 package config
 
 import (
+	"encoding/json"
 	"fmt"
 	"net/url"
 	"strconv"
@@ -148,6 +149,14 @@ func NewDefaultConfig() *Config {
 	return c
 }
 
+func (c *Config) String() string {
+	bytes, err := json.Marshal(c)
+	if err != nil {
+		return err.Error()
+	}
+	return string(bytes)
+}
+
 // ParseAddress parses the address to user-defined config.
 func ParseAddress(address string) (config *Config, err error) {
 	c := NewDefaultConfig()
diff --git a/tubemq-client-twins/tubemq-client-go/remote/remote.go b/tubemq-client-twins/tubemq-client-go/remote/remote.go
index 01d96ca..b7c65d5 100644
--- a/tubemq-client-twins/tubemq-client-go/remote/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/remote/remote.go
@@ -372,7 +372,7 @@ func (r *RmtDataCache) GetCurConsumeStatus() int32 {
 		return errs.RetErrNoPartAssigned
 	}
 	if len(r.indexPartitions) == 0 {
-		if len(r.usedPartitions) == 0 {
+		if len(r.usedPartitions) > 0 {
 			return errs.RetErrAllPartInUse
 		} else {
 			return errs.RetErrAllPartWaiting
@@ -391,7 +391,7 @@ func (r *RmtDataCache) SelectPartition() (*metadata.Partition, int64, error) {
 		return nil, 0, errs.ErrNoPartAssigned
 	} else {
 		if len(r.indexPartitions) == 0 {
-			if len(r.usedPartitions) == 0 {
+			if len(r.usedPartitions) > 0 {
 				return nil, 0, errs.ErrAllPartInUse
 			} else {
 				return nil, 0, errs.ErrAllPartWaiting