You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/06 08:19:32 UTC

[incubator-inlong] branch INLONG-25 updated: [INLONG-637]Go SDK log (#496)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/INLONG-25 by this push:
     new 7b94fb8  [INLONG-637]Go SDK log (#496)
7b94fb8 is described below

commit 7b94fb831a402a86c1f9a874abea10bf5adb042d
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jul 6 16:19:22 2021 +0800

    [INLONG-637]Go SDK log (#496)
---
 .../tubemq-client-go/client/consumer_impl.go       |  33 ++-
 .../tubemq-client-go/client/heartbeat.go           |   8 +
 .../tubemq-client-go/flowctrl/handler.go           |  11 +-
 tubemq-client-twins/tubemq-client-go/go.mod        |   2 +
 tubemq-client-twins/tubemq-client-go/log/config.go |  41 +++
 tubemq-client-twins/tubemq-client-go/log/log.go    | 113 ++++++++
 .../tubemq-client-go/log/log_test.go               |  57 ++++
 tubemq-client-twins/tubemq-client-go/log/logger.go |  78 ++++++
 .../tubemq-client-go/log/warpper_zaplog.go         | 305 +++++++++++++++++++++
 9 files changed, 639 insertions(+), 9 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8163de4..51295f8 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -32,6 +32,7 @@ import (
 
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
@@ -107,6 +108,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
 	}
 	c.heartbeatManager.registerMaster(c.master.Address)
 	go c.processRebalanceEvent()
+	log.Infof("[CONSUMER] start consumer success, client=%s", clientID)
 	return c, nil
 }
 
@@ -121,13 +123,18 @@ func (c *consumer) register2Master(needChange bool) error {
 	for c.master.HasNext {
 		rsp, err := c.sendRegRequest2Master()
 		if err != nil {
+			log.Infof("[CONSUMER]register2Master error %s", err.Error())
 			return err
 		}
+
+		log.Info("register2Master response %s", rsp.String())
 		if !rsp.GetSuccess() {
 			if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+				log.Warnf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.ErrMsg)
 				return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 			}
 
+			log.Warnf("[CONSUMER] register2master(%s) failure, client=%s, error: %s", c.master.Address, c.clientID, rsp.ErrMsg)
 			if c.master, err = c.selector.Select(c.config.Consumer.Masters); err != nil {
 				return err
 			}
@@ -222,6 +229,7 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
 	defer cancel()
 	rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
 	if err != nil {
+		log.Infof("[CONSUMER]GetMessage error %s", err.Error())
 		return nil, err
 	}
 	cs := &ConsumerResult{
@@ -261,6 +269,7 @@ func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResul
 
 	rsp, err := c.sendConfirmReq2Broker(partition)
 	if err != nil {
+		log.Infof("[CONSUMER]Confirm error %s", err.Error())
 		return nil, err
 	}
 
@@ -336,6 +345,7 @@ func (c *consumer) GetClientID() string {
 
 // Close implementation of TubeMQ consumer.
 func (c *consumer) Close() error {
+	log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID)
 	close(c.done)
 	err := c.close2Master()
 	if err != nil {
@@ -344,10 +354,12 @@ func (c *consumer) Close() error {
 	c.closeAllBrokers()
 	c.heartbeatManager.close()
 	c.client.Close()
+	log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID)
 	return nil
 }
 
 func (c *consumer) processRebalanceEvent() {
+	log.Info("[CONSUMER]Rebalance event Handler starts!")
 	for {
 		select {
 		case event, ok := <-c.rmtDataCache.EventCh:
@@ -363,12 +375,15 @@ func (c *consumer) processRebalanceEvent() {
 				}
 			}
 		case <-c.done:
+			log.Infof("[CONSUMER]Rebalance done, client=%s", c.clientID)
 			break
 		}
 	}
+	log.Info("[CONSUMER] Rebalance event Handler stopped!")
 }
 
 func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
+	log.Tracef("[disconnect2Broker] connect event begin, client id=%s", c.clientID)
 	subscribeInfo := event.GetSubscribeInfo()
 	if len(subscribeInfo) > 0 {
 		removedPartitions := make(map[*metadata.Node][]*metadata.Partition)
@@ -377,7 +392,8 @@ func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
 			c.unregister2Broker(removedPartitions)
 		}
 	}
-	event.SetEventStatus(2)
+	event.SetEventStatus(metadata.Disconnect)
+	log.Tracef("[disconnect2Broker] connect event finished, client id=%s", c.clientID)
 }
 
 func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) {
@@ -387,6 +403,7 @@ func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metad
 
 	for _, partitions := range unRegPartitions {
 		for _, partition := range partitions {
+			log.Tracef("unregister2Brokers, partition key=%s", partition.GetPartitionKey())
 			c.sendUnregisterReq2Broker(partition)
 		}
 	}
@@ -412,6 +429,7 @@ func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
 }
 
 func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+	log.Tracef("[connect2Broker] connect event begin, client id=%s", c.clientID)
 	if len(event.GetSubscribeInfo()) > 0 {
 		unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
 		if len(unsubPartitions) > 0 {
@@ -422,10 +440,11 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 
 				rsp, err := c.sendRegisterReq2Broker(partition, node)
 				if err != nil {
-					//todo add log
+					log.Warnf("[connect2Broker] error %s", err.Error())
+					continue
 				}
 				if !rsp.GetSuccess() {
-					//todo add log
+					log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.ErrCode, rsp.ErrMsg)
 					return
 				}
 
@@ -436,6 +455,7 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
 	}
 	c.subInfo.FirstRegistered()
 	event.SetEventStatus(metadata.Disconnect)
+	log.Tracef("[connect2Broker] connect event finished, client id=%s", c.clientID)
 }
 
 func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) {
@@ -508,8 +528,10 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
 	if isFirstReg {
 		if c.config.Consumer.ConsumePosition == 0 {
 			readStatus = consumeStatusFromMax
+			log.Infof("[Consumer From Max Offset], client id=", c.clientID)
 		} else if c.config.Consumer.ConsumePosition > 0 {
 			readStatus = consumeStatusFromMaxAlways
+			log.Infof("[Consumer From Max Offset Always], client id=", c.clientID)
 		}
 	}
 	return int32(readStatus)
@@ -556,6 +578,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
 		cd := metadata.NewConsumeData(now, 200, escLimit, int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
 		c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
 		pi.currOffset = currOffset
+		log.Tracef("[CONSUMER] getMessage count=%ld, from %s, client=%s", len(msgs), partition.GetPartitionKey(), c.clientID)
 		return msgs, nil
 	case errs.RetErrHBNoNode, errs.RetCertificateFailure, errs.RetErrDuplicatePartition:
 		partitionKey, _, err := util.ParseConfirmContext(confirmContext)
@@ -646,6 +669,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
 }
 
 func (c *consumer) close2Master() error {
+	log.Infof("[CONSUMER] close2Master begin, client id=%s", c.clientID)
 	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
 	defer cancel()
 
@@ -670,12 +694,15 @@ func (c *consumer) close2Master() error {
 	if !rsp.GetSuccess() {
 		return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
 	}
+	log.Infof("[CONSUMER] close2Master finished, client id=%s", c.clientID)
 	return nil
 }
 
 func (c *consumer) closeAllBrokers() {
+	log.Infof("[CONSUMER] closeAllBrokers begin, client id=%s", c.clientID)
 	partitions := c.rmtDataCache.GetAllClosedBrokerParts()
 	if len(partitions) > 0 {
 		c.unregister2Broker(partitions)
 	}
+	log.Infof("[CONSUMER] closeAllBrokers end, client id=%s", c.clientID)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 88ac12c..b450734 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -25,6 +25,7 @@ import (
 	"time"
 
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
@@ -103,6 +104,7 @@ func (h *heartbeatManager) consumerHB2Master() {
 		if !rsp.GetSuccess() {
 			h.consumer.masterHBRetry++
 			if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+				log.Warnf("[CONSUMER] hb2master found no-node or standby, re-register, client=%s", h.consumer.clientID)
 				address := h.consumer.master.Address
 				go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
 				if rsp.GetErrCode() != errs.RetErrHBNoNode {
@@ -113,7 +115,9 @@ func (h *heartbeatManager) consumerHB2Master() {
 						delete(h.heartbeats, address)
 						h.mu.Unlock()
 					}
+					return
 				}
+				log.Warnf("[CONSUMER] heartBeat2Master failure to (%s) : %s, client=%s", h.consumer.master.Address, rsp.ErrMsg, h.consumer.clientID)
 				return
 			}
 		}
@@ -188,6 +192,7 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 
 	rsp, err := h.sendHeartbeatC2B(broker)
 	if err != nil {
+		log.Warnf("[Heartbeat2Broker] request network to failure %s", err.Error())
 		return
 	}
 	partitionKeys := make([]string, 0, len(partitions))
@@ -195,6 +200,7 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 		for _, partition := range partitions {
 			partitionKeys = append(partitionKeys, partition.GetPartitionKey())
 		}
+		log.Warnf("[Heartbeat2Broker] request (%s) CertificateFailure", broker.GetAddress())
 		h.consumer.rmtDataCache.RemovePartition(partitionKeys)
 	}
 	if rsp.GetSuccess() && rsp.GetHasPartFailure() {
@@ -207,10 +213,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
 			if err != nil {
 				continue
 			}
+			log.Tracef("[Heartbeat2Broker] found partition(%s) hb failure!", partition.GetPartitionKey())
 			partitionKeys = append(partitionKeys, partition.GetPartitionKey())
 		}
 		h.consumer.rmtDataCache.RemovePartition(partitionKeys)
 	}
+	log.Tracef("[Heartbeat2Broker] out hb response process, add broker(%s) timer!", broker.GetAddress())
 	h.resetBrokerTimer(broker)
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index d9fd9bc..52bb732 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -29,6 +29,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
 )
 
@@ -103,13 +104,11 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64,
 	}
 	h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
 	if isDefault {
-		// todo log
-		// LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
-		//             flowctrl_id)
+		log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
+			flowCtrlID)
 	} else {
-		// todo log
-		// LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
-		//             flowctrl_id)
+		log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
+			flowCtrlID)
 	}
 	return nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/go.mod b/tubemq-client-twins/tubemq-client-go/go.mod
index 62698f0..4df5077 100644
--- a/tubemq-client-twins/tubemq-client-go/go.mod
+++ b/tubemq-client-twins/tubemq-client-go/go.mod
@@ -20,5 +20,7 @@ go 1.14
 require (
 	github.com/golang/protobuf v1.4.3
 	github.com/stretchr/testify v1.7.0
+	go.uber.org/zap v1.16.0
 	google.golang.org/protobuf v1.23.0
+	gopkg.in/natefinch/lumberjack.v2 v2.0.0
 )
diff --git a/tubemq-client-twins/tubemq-client-go/log/config.go b/tubemq-client-twins/tubemq-client-go/log/config.go
new file mode 100644
index 0000000..b71cf9d
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/config.go
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+// OutputConfig defines the output config which can be reconfigured by user.
+type OutputConfig struct {
+	// LogPath is the path for the log.
+	LogPath string
+	// Level is the log level.
+	Level string
+	// MaxSize is the max size for a rolling log.
+	MaxSize int
+	// MaxBackups is the maximum number of old log files to retain.
+	MaxBackups int
+	// MaxAge is the maximum number of days to retain old log files based on the
+	// timestamp encoded in their filename.
+	MaxAge int
+}
+
+var defaultConfig = &OutputConfig{
+	LogPath:    "../log/tubemq",
+	MaxSize:    100,
+	MaxBackups: 5,
+	MaxAge:     3,
+	Level:      "error",
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/log.go b/tubemq-client-twins/tubemq-client-go/log/log.go
new file mode 100644
index 0000000..2a4de81
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/log.go
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package log defines the logger for the sdk.
+package log
+
+var traceEnabled = false
+
+// EnableTrace enables trace level log.
+func EnableTrace() {
+	traceEnabled = true
+}
+
+var defaultLogger = newDefaultLogger()
+
+func newDefaultLogger() Logger {
+	return newZapLog(defaultConfig)
+}
+
+// NewLogger will return a configured zap logger.
+func NewLogger(config *OutputConfig) Logger {
+	defaultLogger = newZapLog(config)
+	return defaultLogger
+}
+
+// SetLogger sets the default logger to the given logger.
+func SetLogger(logger Logger) {
+	defaultLogger = logger
+}
+
+// WithFields is the proxy to call the WithFields of defaultLogger.
+func WithFields(fields ...string) Logger {
+	return defaultLogger.WithFields(fields...)
+}
+
+// Trace logs to TRACE level log. Arguments are handled in the manner of fmt.Print.
+func Trace(args ...interface{}) {
+	if traceEnabled {
+		defaultLogger.Trace(args...)
+	}
+}
+
+// Tracef logs to TRACE level log. Arguments are handled in the manner of fmt.Printf.
+func Tracef(format string, args ...interface{}) {
+	if traceEnabled {
+		defaultLogger.Tracef(format, args...)
+	}
+}
+
+// Debug logs to DEBUG level log. Arguments are handled in the manner of fmt.Print.
+func Debug(args ...interface{}) {
+	defaultLogger.Debug(args...)
+}
+
+// Debug logs to DEBUG level log. Arguments are handled in the manner of fmt.Printf.
+func Debugf(format string, args ...interface{}) {
+	defaultLogger.Debugf(format, args...)
+}
+
+// Info logs to INFO level log. Arguments are handled in the manner of fmt.Print.
+func Info(args ...interface{}) {
+	defaultLogger.Info(args...)
+}
+
+// Infof logs to INFO level log. Arguments are handled in the manner of fmt.Print.
+func Infof(format string, args ...interface{}) {
+	defaultLogger.Infof(format, args...)
+}
+
+// Warn logs to WARNING level log. Arguments are handled in the manner of fmt.Print.
+func Warn(args ...interface{}) {
+	defaultLogger.Warn(args...)
+}
+
+// Warnf logs to WARNING level log. Arguments are handled in the manner of fmt.Printf.
+func Warnf(format string, args ...interface{}) {
+	defaultLogger.Warnf(format, args...)
+}
+
+// Error logs to ERROR level log. Arguments are handled in the manner of fmt.Print.
+func Error(args ...interface{}) {
+	defaultLogger.Error(args...)
+}
+
+// Errorf logs to ERROR level log. Arguments are handled in the manner of fmt.Printf.
+func Errorf(format string, args ...interface{}) {
+	defaultLogger.Errorf(format, args...)
+}
+
+// Fatal logs to ERROR level log. Arguments are handled in the manner of fmt.Print.
+// that all Fatal logs will exit with os.Exit(1).
+func Fatal(args ...interface{}) {
+	defaultLogger.Fatal(args...)
+}
+
+// Fatalf logs to ERROR level log. Arguments are handled in the manner of fmt.Printf.
+func Fatalf(format string, args ...interface{}) {
+	defaultLogger.Fatalf(format, args...)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/log_test.go b/tubemq-client-twins/tubemq-client-go/log/log_test.go
new file mode 100644
index 0000000..d855907
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/log_test.go
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+import (
+	"bufio"
+	"io"
+	"os"
+	"regexp"
+	"strings"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestDefaultLog(t *testing.T) {
+	config := defaultConfig
+	Errorf("HelloWorld")
+	f, err := os.Open(config.LogPath)
+	defer f.Close()
+	assert.Nil(t, err)
+	r := bufio.NewReader(f)
+	lines := 0
+	for {
+		content, _, err := r.ReadLine()
+		if err == io.EOF {
+			break
+		}
+		assert.Nil(t, err)
+		words := strings.Fields(string(content))
+		assert.Equal(t, len(words), 5)
+		assert.Regexp(t, regexp.MustCompile(`[\d]{4}-[\d]{2}-[\d]{2}`), words[0])
+		assert.Regexp(t, regexp.MustCompile(`[\d]{2}:[\d]{2}:[\d]{2}`), words[1])
+		assert.Equal(t, "ERROR", words[2])
+		assert.Regexp(t, regexp.MustCompile(`[\w]+/[\w]+.go:[\d]+`), words[3])
+		assert.Equal(t, "HelloWorld", words[4])
+		lines++
+	}
+	assert.Equal(t, 1, lines)
+	err = os.Remove(config.LogPath)
+	assert.Nil(t, err)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/logger.go b/tubemq-client-twins/tubemq-client-go/log/logger.go
new file mode 100644
index 0000000..fd70ef1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/logger.go
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+// Level log level
+type Level int
+
+// log level const
+const (
+	LevelNil Level = iota
+	LevelTrace
+	LevelDebug
+	LevelInfo
+	LevelWarn
+	LevelError
+	LevelFatal
+)
+
+// LevelNames log level name map
+var LevelNames = map[string]Level{
+	"trace": LevelTrace,
+	"debug": LevelDebug,
+	"info":  LevelInfo,
+	"warn":  LevelWarn,
+	"error": LevelError,
+	"fatal": LevelFatal,
+}
+
+type Logger interface {
+	// Trace logs to TRACE log. Arguments are handled in the manner of fmt.Print.
+	Trace(args ...interface{})
+	// Tracef logs to TRACE log. Arguments are handled in the manner of fmt.Printf.
+	Tracef(format string, args ...interface{})
+	// Debug logs to DEBUG log. Arguments are handled in the manner of fmt.Print.
+	Debug(args ...interface{})
+	// Debugf logs to DEBUG log. Arguments are handled in the manner of fmt.Printf.
+	Debugf(format string, args ...interface{})
+	// Info logs to INFO log. Arguments are handled in the manner of fmt.Print.
+	Info(args ...interface{})
+	// Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf.
+	Infof(format string, args ...interface{})
+	// Warn logs to WARNING log. Arguments are handled in the manner of fmt.Print.
+	Warn(args ...interface{})
+	// Warnf logs to WARNING log. Arguments are handled in the manner of fmt.Printf.
+	Warnf(format string, args ...interface{})
+	// Error logs to ERROR log. Arguments are handled in the manner of fmt.Print.
+	Error(args ...interface{})
+	// Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
+	Errorf(format string, args ...interface{})
+	// Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.
+	// that all Fatal logs will exit with os.Exit(1).
+	// Implementations may also call os.Exit() with a non-zero exit code.
+	Fatal(args ...interface{})
+	// Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
+	Fatalf(format string, args ...interface{})
+
+	// Sync calls the underlying Core's Sync method, flushing any buffered log entries.
+	// Applications should take care to call Sync before exiting
+	Sync() error
+
+	// // WithFields sets the user-defined data to the log and return a new Logger.
+	WithFields(fields ...string) Logger
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/warpper_zaplog.go b/tubemq-client-twins/tubemq-client-go/log/warpper_zaplog.go
new file mode 100644
index 0000000..881ddb0
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/warpper_zaplog.go
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+import (
+	"fmt"
+	"time"
+
+	"go.uber.org/zap"
+	"go.uber.org/zap/zapcore"
+	"gopkg.in/natefinch/lumberjack.v2"
+)
+
+type zapLog struct {
+	logger *zap.Logger
+}
+
+var levelToZapLevel = map[Level]zapcore.Level{
+	LevelTrace: zapcore.DebugLevel,
+	LevelDebug: zapcore.DebugLevel,
+	LevelInfo:  zapcore.InfoLevel,
+	LevelWarn:  zapcore.WarnLevel,
+	LevelError: zapcore.ErrorLevel,
+	LevelFatal: zapcore.FatalLevel,
+}
+
+func newZapLog(c *OutputConfig) Logger {
+	core := newFileCore(c)
+	logger := zap.New(
+		zapcore.NewTee(core),
+		zap.AddCallerSkip(2),
+		zap.AddCaller(),
+	)
+	return &zapLog{logger: logger}
+}
+
+func newFileCore(c *OutputConfig) zapcore.Core {
+	w := zapcore.AddSync(&lumberjack.Logger{
+		Filename:   c.LogPath,
+		MaxSize:    c.MaxSize, // megabytes
+		MaxBackups: c.MaxBackups,
+		MaxAge:     c.MaxAge, // days
+	})
+
+	core := zapcore.NewCore(
+		newDefaultEncoder(),
+		w,
+		levelToZapLevel[LevelNames[c.Level]],
+	)
+	return core
+}
+
+func newDefaultEncoder() zapcore.Encoder {
+	encoderCfg := zapcore.EncoderConfig{
+		TimeKey:        "T",
+		LevelKey:       "L",
+		NameKey:        "N",
+		CallerKey:      "C",
+		MessageKey:     "M",
+		StacktraceKey:  "S",
+		LineEnding:     zapcore.DefaultLineEnding,
+		EncodeLevel:    zapcore.CapitalLevelEncoder,
+		EncodeTime:     newDefaultTimeEncoder(),
+		EncodeDuration: zapcore.StringDurationEncoder,
+		EncodeCaller:   zapcore.ShortCallerEncoder,
+	}
+	encoder := zapcore.NewConsoleEncoder(encoderCfg)
+	return encoder
+}
+
+func newDefaultTimeEncoder() zapcore.TimeEncoder {
+	return func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
+		enc.AppendByteString(defaultTimeFormat(t))
+	}
+}
+
+func defaultTimeFormat(t time.Time) []byte {
+	t = t.Local()
+	year, month, day := t.Date()
+	hour, minute, second := t.Clock()
+	micros := t.Nanosecond() / 1000
+
+	buf := make([]byte, 23)
+	buf[0] = byte((year/1000)%10) + '0'
+	buf[1] = byte((year/100)%10) + '0'
+	buf[2] = byte((year/10)%10) + '0'
+	buf[3] = byte(year%10) + '0'
+	buf[4] = '-'
+	buf[5] = byte((month)/10) + '0'
+	buf[6] = byte((month)%10) + '0'
+	buf[7] = '-'
+	buf[8] = byte((day)/10) + '0'
+	buf[9] = byte((day)%10) + '0'
+	buf[10] = ' '
+	buf[11] = byte((hour)/10) + '0'
+	buf[12] = byte((hour)%10) + '0'
+	buf[13] = ':'
+	buf[14] = byte((minute)/10) + '0'
+	buf[15] = byte((minute)%10) + '0'
+	buf[16] = ':'
+	buf[17] = byte((second)/10) + '0'
+	buf[18] = byte((second)%10) + '0'
+	buf[19] = '.'
+	buf[20] = byte((micros/100000)%10) + '0'
+	buf[21] = byte((micros/10000)%10) + '0'
+	buf[22] = byte((micros/1000)%10) + '0'
+	return buf
+}
+
+// ZapLogWrapper implements Logger interface
+// based on the underlying zapLog.
+type ZapLogWrapper struct {
+	l *zapLog
+}
+
+// GetLogger returns the wrapped zap looger.
+func (z *ZapLogWrapper) GetLogger() Logger {
+	return z.l
+}
+
+// Trace logs to TRACE log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Trace(args ...interface{}) {
+	z.l.Trace(args...)
+}
+
+// Tracef logs to TRACE log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Tracef(format string, args ...interface{}) {
+	z.l.Tracef(format, args...)
+}
+
+// Debug logs to DEBUG log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Debug(args ...interface{}) {
+	z.l.Debug(args...)
+}
+
+// Debugf logs to DEBUG log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Debugf(format string, args ...interface{}) {
+	z.l.Debugf(format, args...)
+}
+
+// Info logs to INFO log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Info(args ...interface{}) {
+	z.l.Info(args...)
+}
+
+// Infof logs to INFO log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Infof(format string, args ...interface{}) {
+	z.l.Infof(format, args...)
+}
+
+// Warn logs to WARNING log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Warn(args ...interface{}) {
+	z.l.Warn(args...)
+}
+
+// Warnf logs to WARNING log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Warnf(format string, args ...interface{}) {
+	z.l.Warnf(format, args...)
+}
+
+// Error logs to ERROR log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Error(args ...interface{}) {
+	z.l.Error(args...)
+}
+
+// Errorf logs to ERROR log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Errorf(format string, args ...interface{}) {
+	z.l.Errorf(format, args...)
+}
+
+// Fatal logs to FATAL log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Fatal(args ...interface{}) {
+	z.l.Fatal(args...)
+}
+
+// Fatalf logs to FATAL log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Fatalf(format string, args ...interface{}) {
+	z.l.Fatalf(format, args...)
+}
+
+// Sync calls the zap defaultLogger's Sync method, flushing any buffered log entries.
+// Applications should take care to call Sync before exiting.
+func (z *ZapLogWrapper) Sync() error {
+	return z.l.Sync()
+}
+
+// WithFields sets the user-defined data to the log and return a new Logger.
+func (z *ZapLogWrapper) WithFields(fields ...string) Logger {
+	return z.l.WithFields(fields...)
+}
+
+// WithFields sets the user-defined data to the log and return a new Logger.
+func (l *zapLog) WithFields(fields ...string) Logger {
+	zapfields := make([]zap.Field, len(fields)/2)
+	for index := range zapfields {
+		zapfields[index] = zap.String(fields[2*index], fields[2*index+1])
+	}
+
+	return &ZapLogWrapper{l: &zapLog{logger: l.logger.With(zapfields...)}}
+}
+
+// Trace logs to TRACE log, Arguments are handled in the manner of fmt.Print.
+func (l *zapLog) Trace(args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.DebugLevel) {
+		l.logger.Debug(fmt.Sprint(args...))
+	}
+}
+
+// Tracef logs to TRACE log, Arguments are handled in the manner of fmt.Printf.
+func (l *zapLog) Tracef(format string, args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.DebugLevel) {
+		l.logger.Debug(fmt.Sprintf(format, args...))
+	}
+}
+
+// Info logs to Debug log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Debug(args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.DebugLevel) {
+		l.logger.Debug(fmt.Sprint(args...))
+	}
+}
+
+// Info logs to Debug log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Debugf(format string, args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.DebugLevel) {
+		l.logger.Debug(fmt.Sprintf(format, args...))
+	}
+}
+
+// Info logs to INFO log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Info(args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.InfoLevel) {
+		l.logger.Info(fmt.Sprint(args...))
+	}
+}
+
+// Infof logs to INFO log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Infof(format string, args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.InfoLevel) {
+		l.logger.Info(fmt.Sprintf(format, args...))
+	}
+}
+
+// Warn logs to WARNING log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Warn(args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.WarnLevel) {
+		l.logger.Warn(fmt.Sprint(args...))
+	}
+}
+
+// Warnf logs to WARNING log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Warnf(format string, args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.WarnLevel) {
+		l.logger.Warn(fmt.Sprintf(format, args...))
+	}
+}
+
+// Error logs to ERROR log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Error(args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.ErrorLevel) {
+		l.logger.Error(fmt.Sprint(args...))
+	}
+}
+
+// Errorf logs to ERROR log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Errorf(format string, args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.ErrorLevel) {
+		l.logger.Error(fmt.Sprintf(format, args...))
+	}
+}
+
+// Fatal logs to FATAL log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Fatal(args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.FatalLevel) {
+		l.logger.Fatal(fmt.Sprint(args...))
+	}
+}
+
+// Fatalf logs to FATAL log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Fatalf(format string, args ...interface{}) {
+	if l.logger.Core().Enabled(zapcore.FatalLevel) {
+		l.logger.Fatal(fmt.Sprintf(format, args...))
+	}
+}
+
+// Sync calls the zap defaultLogger's Sync method, flushing any buffered log entries.
+// Applications should take care to call Sync before exiting.
+func (l *zapLog) Sync() error {
+	return l.logger.Sync()
+}