You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/07/06 08:19:32 UTC
[incubator-inlong] branch INLONG-25 updated: [INLONG-637]Go SDK log
(#496)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/INLONG-25 by this push:
new 7b94fb8 [INLONG-637]Go SDK log (#496)
7b94fb8 is described below
commit 7b94fb831a402a86c1f9a874abea10bf5adb042d
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Tue Jul 6 16:19:22 2021 +0800
[INLONG-637]Go SDK log (#496)
---
.../tubemq-client-go/client/consumer_impl.go | 33 ++-
.../tubemq-client-go/client/heartbeat.go | 8 +
.../tubemq-client-go/flowctrl/handler.go | 11 +-
tubemq-client-twins/tubemq-client-go/go.mod | 2 +
tubemq-client-twins/tubemq-client-go/log/config.go | 41 +++
tubemq-client-twins/tubemq-client-go/log/log.go | 113 ++++++++
.../tubemq-client-go/log/log_test.go | 57 ++++
tubemq-client-twins/tubemq-client-go/log/logger.go | 78 ++++++
.../tubemq-client-go/log/warpper_zaplog.go | 305 +++++++++++++++++++++
9 files changed, 639 insertions(+), 9 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
index 8163de4..51295f8 100644
--- a/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
+++ b/tubemq-client-twins/tubemq-client-go/client/consumer_impl.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
@@ -107,6 +108,7 @@ func NewConsumer(config *config.Config) (Consumer, error) {
}
c.heartbeatManager.registerMaster(c.master.Address)
go c.processRebalanceEvent()
+ log.Infof("[CONSUMER] start consumer success, client=%s", clientID)
return c, nil
}
@@ -121,13 +123,18 @@ func (c *consumer) register2Master(needChange bool) error {
for c.master.HasNext {
rsp, err := c.sendRegRequest2Master()
if err != nil {
+ log.Infof("[CONSUMER]register2Master error %s", err.Error())
return err
}
+
+ log.Info("register2Master response %s", rsp.String())
if !rsp.GetSuccess() {
if rsp.GetErrCode() == errs.RetConsumeGroupForbidden || rsp.GetErrCode() == errs.RetConsumeContentForbidden {
+ log.Warnf("[CONSUMER] register2master(%s) failure exist register, client=%s, error: %s", c.master.Address, c.clientID, rsp.ErrMsg)
return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
+ log.Warnf("[CONSUMER] register2master(%s) failure, client=%s, error: %s", c.master.Address, c.clientID, rsp.ErrMsg)
if c.master, err = c.selector.Select(c.config.Consumer.Masters); err != nil {
return err
}
@@ -222,6 +229,7 @@ func (c *consumer) GetMessage() (*ConsumerResult, error) {
defer cancel()
rsp, err := c.client.GetMessageRequestC2B(ctx, m, c.subInfo, c.rmtDataCache)
if err != nil {
+ log.Infof("[CONSUMER]GetMessage error %s", err.Error())
return nil, err
}
cs := &ConsumerResult{
@@ -261,6 +269,7 @@ func (c *consumer) Confirm(confirmContext string, consumed bool) (*ConsumerResul
rsp, err := c.sendConfirmReq2Broker(partition)
if err != nil {
+ log.Infof("[CONSUMER]Confirm error %s", err.Error())
return nil, err
}
@@ -336,6 +345,7 @@ func (c *consumer) GetClientID() string {
// Close implementation of TubeMQ consumer.
func (c *consumer) Close() error {
+ log.Infof("[CONSUMER]Begin to close consumer, client=%s", c.clientID)
close(c.done)
err := c.close2Master()
if err != nil {
@@ -344,10 +354,12 @@ func (c *consumer) Close() error {
c.closeAllBrokers()
c.heartbeatManager.close()
c.client.Close()
+ log.Infof("[CONSUMER]Consumer has been closed successfully, client=%s", c.clientID)
return nil
}
func (c *consumer) processRebalanceEvent() {
+ log.Info("[CONSUMER]Rebalance event Handler starts!")
for {
select {
case event, ok := <-c.rmtDataCache.EventCh:
@@ -363,12 +375,15 @@ func (c *consumer) processRebalanceEvent() {
}
}
case <-c.done:
+ log.Infof("[CONSUMER]Rebalance done, client=%s", c.clientID)
break
}
}
+ log.Info("[CONSUMER] Rebalance event Handler stopped!")
}
func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
+ log.Tracef("[disconnect2Broker] connect event begin, client id=%s", c.clientID)
subscribeInfo := event.GetSubscribeInfo()
if len(subscribeInfo) > 0 {
removedPartitions := make(map[*metadata.Node][]*metadata.Partition)
@@ -377,7 +392,8 @@ func (c *consumer) disconnect2Broker(event *metadata.ConsumerEvent) {
c.unregister2Broker(removedPartitions)
}
}
- event.SetEventStatus(2)
+ event.SetEventStatus(metadata.Disconnect)
+ log.Tracef("[disconnect2Broker] connect event finished, client id=%s", c.clientID)
}
func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metadata.Partition) {
@@ -387,6 +403,7 @@ func (c *consumer) unregister2Broker(unRegPartitions map[*metadata.Node][]*metad
for _, partitions := range unRegPartitions {
for _, partition := range partitions {
+ log.Tracef("unregister2Brokers, partition key=%s", partition.GetPartitionKey())
c.sendUnregisterReq2Broker(partition)
}
}
@@ -412,6 +429,7 @@ func (c *consumer) sendUnregisterReq2Broker(partition *metadata.Partition) {
}
func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
+ log.Tracef("[connect2Broker] connect event begin, client id=%s", c.clientID)
if len(event.GetSubscribeInfo()) > 0 {
unsubPartitions := c.rmtDataCache.FilterPartitions(event.GetSubscribeInfo())
if len(unsubPartitions) > 0 {
@@ -422,10 +440,11 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
rsp, err := c.sendRegisterReq2Broker(partition, node)
if err != nil {
- //todo add log
+ log.Warnf("[connect2Broker] error %s", err.Error())
+ continue
}
if !rsp.GetSuccess() {
- //todo add log
+ log.Warnf("[connect2Broker] err code:%d, err msg: %s", rsp.ErrCode, rsp.ErrMsg)
return
}
@@ -436,6 +455,7 @@ func (c *consumer) connect2Broker(event *metadata.ConsumerEvent) {
}
c.subInfo.FirstRegistered()
event.SetEventStatus(metadata.Disconnect)
+ log.Tracef("[connect2Broker] connect event finished, client id=%s", c.clientID)
}
func (c *consumer) sendRegisterReq2Broker(partition *metadata.Partition, node *metadata.Node) (*protocol.RegisterResponseB2C, error) {
@@ -508,8 +528,10 @@ func (c *consumer) getConsumeReadStatus(isFirstReg bool) int32 {
if isFirstReg {
if c.config.Consumer.ConsumePosition == 0 {
readStatus = consumeStatusFromMax
+ log.Infof("[Consumer From Max Offset], client id=", c.clientID)
} else if c.config.Consumer.ConsumePosition > 0 {
readStatus = consumeStatusFromMaxAlways
+ log.Infof("[Consumer From Max Offset Always], client id=", c.clientID)
}
}
return int32(readStatus)
@@ -556,6 +578,7 @@ func (c *consumer) processGetMessageRspB2C(pi *PeerInfo, filtered bool, partitio
cd := metadata.NewConsumeData(now, 200, escLimit, int32(msgSize), 0, dataDleVal, rsp.GetRequireSlow())
c.rmtDataCache.BookConsumeData(partition.GetPartitionKey(), cd)
pi.currOffset = currOffset
+ log.Tracef("[CONSUMER] getMessage count=%ld, from %s, client=%s", len(msgs), partition.GetPartitionKey(), c.clientID)
return msgs, nil
case errs.RetErrHBNoNode, errs.RetCertificateFailure, errs.RetErrDuplicatePartition:
partitionKey, _, err := util.ParseConfirmContext(confirmContext)
@@ -646,6 +669,7 @@ func (c *consumer) convertMessages(filtered bool, topic string, rsp *protocol.Ge
}
func (c *consumer) close2Master() error {
+ log.Infof("[CONSUMER] close2Master begin, client id=%s", c.clientID)
ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
defer cancel()
@@ -670,12 +694,15 @@ func (c *consumer) close2Master() error {
if !rsp.GetSuccess() {
return errs.New(rsp.GetErrCode(), rsp.GetErrMsg())
}
+ log.Infof("[CONSUMER] close2Master finished, client id=%s", c.clientID)
return nil
}
func (c *consumer) closeAllBrokers() {
+ log.Infof("[CONSUMER] closeAllBrokers begin, client id=%s", c.clientID)
partitions := c.rmtDataCache.GetAllClosedBrokerParts()
if len(partitions) > 0 {
c.unregister2Broker(partitions)
}
+ log.Infof("[CONSUMER] closeAllBrokers end, client id=%s", c.clientID)
}
diff --git a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
index 88ac12c..b450734 100644
--- a/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
+++ b/tubemq-client-twins/tubemq-client-go/client/heartbeat.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
@@ -103,6 +104,7 @@ func (h *heartbeatManager) consumerHB2Master() {
if !rsp.GetSuccess() {
h.consumer.masterHBRetry++
if rsp.GetErrCode() == errs.RetErrHBNoNode || strings.Index(rsp.GetErrMsg(), "StandbyException") != -1 {
+ log.Warnf("[CONSUMER] hb2master found no-node or standby, re-register, client=%s", h.consumer.clientID)
address := h.consumer.master.Address
go h.consumer.register2Master(rsp.GetErrCode() != errs.RetErrHBNoNode)
if rsp.GetErrCode() != errs.RetErrHBNoNode {
@@ -113,7 +115,9 @@ func (h *heartbeatManager) consumerHB2Master() {
delete(h.heartbeats, address)
h.mu.Unlock()
}
+ return
}
+ log.Warnf("[CONSUMER] heartBeat2Master failure to (%s) : %s, client=%s", h.consumer.master.Address, rsp.ErrMsg, h.consumer.clientID)
return
}
}
@@ -188,6 +192,7 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
rsp, err := h.sendHeartbeatC2B(broker)
if err != nil {
+ log.Warnf("[Heartbeat2Broker] request network to failure %s", err.Error())
return
}
partitionKeys := make([]string, 0, len(partitions))
@@ -195,6 +200,7 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
for _, partition := range partitions {
partitionKeys = append(partitionKeys, partition.GetPartitionKey())
}
+ log.Warnf("[Heartbeat2Broker] request (%s) CertificateFailure", broker.GetAddress())
h.consumer.rmtDataCache.RemovePartition(partitionKeys)
}
if rsp.GetSuccess() && rsp.GetHasPartFailure() {
@@ -207,10 +213,12 @@ func (h *heartbeatManager) consumerHB2Broker(broker *metadata.Node) {
if err != nil {
continue
}
+ log.Tracef("[Heartbeat2Broker] found partition(%s) hb failure!", partition.GetPartitionKey())
partitionKeys = append(partitionKeys, partition.GetPartitionKey())
}
h.consumer.rmtDataCache.RemovePartition(partitionKeys)
}
+ log.Tracef("[Heartbeat2Broker] out hb response process, add broker(%s) timer!", broker.GetAddress())
h.resetBrokerTimer(broker)
}
diff --git a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
index d9fd9bc..52bb732 100644
--- a/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
+++ b/tubemq-client-twins/tubemq-client-go/flowctrl/handler.go
@@ -29,6 +29,7 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/log"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/util"
)
@@ -103,13 +104,11 @@ func (h *RuleHandler) UpdateDefFlowCtrlInfo(isDefault bool, qrypriorityID int64,
}
h.lastUpdate = time.Now().UnixNano() / int64(time.Millisecond)
if isDefault {
- // todo log
- // LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
- // flowctrl_id)
+ log.Infof("[Flow Ctrl] Default FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
+ flowCtrlID)
} else {
- // todo log
- // LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
- // flowctrl_id)
+ log.Infof("[Flow Ctrl] Group FlowCtrl's flow ctrl id from %d to %d", atomic.LoadInt64(&h.flowCtrlID),
+ flowCtrlID)
}
return nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/go.mod b/tubemq-client-twins/tubemq-client-go/go.mod
index 62698f0..4df5077 100644
--- a/tubemq-client-twins/tubemq-client-go/go.mod
+++ b/tubemq-client-twins/tubemq-client-go/go.mod
@@ -20,5 +20,7 @@ go 1.14
require (
github.com/golang/protobuf v1.4.3
github.com/stretchr/testify v1.7.0
+ go.uber.org/zap v1.16.0
google.golang.org/protobuf v1.23.0
+ gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
diff --git a/tubemq-client-twins/tubemq-client-go/log/config.go b/tubemq-client-twins/tubemq-client-go/log/config.go
new file mode 100644
index 0000000..b71cf9d
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/config.go
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+// OutputConfig defines the output config which can be reconfigured by user.
+type OutputConfig struct {
+ // LogPath is the path for the log.
+ LogPath string
+ // Level is the log level.
+ Level string
+ // MaxSize is the max size for a rolling log.
+ MaxSize int
+ // MaxBackups is the maximum number of old log files to retain.
+ MaxBackups int
+ // MaxAge is the maximum number of days to retain old log files based on the
+ // timestamp encoded in their filename.
+ MaxAge int
+}
+
+var defaultConfig = &OutputConfig{
+ LogPath: "../log/tubemq",
+ MaxSize: 100,
+ MaxBackups: 5,
+ MaxAge: 3,
+ Level: "error",
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/log.go b/tubemq-client-twins/tubemq-client-go/log/log.go
new file mode 100644
index 0000000..2a4de81
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/log.go
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// package log defines the logger for the sdk.
+package log
+
+var traceEnabled = false
+
+// EnableTrace enables trace level log.
+func EnableTrace() {
+ traceEnabled = true
+}
+
+var defaultLogger = newDefaultLogger()
+
+func newDefaultLogger() Logger {
+ return newZapLog(defaultConfig)
+}
+
+// NewLogger will return a configured zap logger.
+func NewLogger(config *OutputConfig) Logger {
+ defaultLogger = newZapLog(config)
+ return defaultLogger
+}
+
+// SetLogger sets the default logger to the given logger.
+func SetLogger(logger Logger) {
+ defaultLogger = logger
+}
+
+// WithFields is the proxy to call the WithFields of defaultLogger.
+func WithFields(fields ...string) Logger {
+ return defaultLogger.WithFields(fields...)
+}
+
+// Trace logs to TRACE level log. Arguments are handled in the manner of fmt.Print.
+func Trace(args ...interface{}) {
+ if traceEnabled {
+ defaultLogger.Trace(args...)
+ }
+}
+
+// Tracef logs to TRACE level log. Arguments are handled in the manner of fmt.Printf.
+func Tracef(format string, args ...interface{}) {
+ if traceEnabled {
+ defaultLogger.Tracef(format, args...)
+ }
+}
+
+// Debug logs to DEBUG level log. Arguments are handled in the manner of fmt.Print.
+func Debug(args ...interface{}) {
+ defaultLogger.Debug(args...)
+}
+
+// Debug logs to DEBUG level log. Arguments are handled in the manner of fmt.Printf.
+func Debugf(format string, args ...interface{}) {
+ defaultLogger.Debugf(format, args...)
+}
+
+// Info logs to INFO level log. Arguments are handled in the manner of fmt.Print.
+func Info(args ...interface{}) {
+ defaultLogger.Info(args...)
+}
+
+// Infof logs to INFO level log. Arguments are handled in the manner of fmt.Print.
+func Infof(format string, args ...interface{}) {
+ defaultLogger.Infof(format, args...)
+}
+
+// Warn logs to WARNING level log. Arguments are handled in the manner of fmt.Print.
+func Warn(args ...interface{}) {
+ defaultLogger.Warn(args...)
+}
+
+// Warnf logs to WARNING level log. Arguments are handled in the manner of fmt.Printf.
+func Warnf(format string, args ...interface{}) {
+ defaultLogger.Warnf(format, args...)
+}
+
+// Error logs to ERROR level log. Arguments are handled in the manner of fmt.Print.
+func Error(args ...interface{}) {
+ defaultLogger.Error(args...)
+}
+
+// Errorf logs to ERROR level log. Arguments are handled in the manner of fmt.Printf.
+func Errorf(format string, args ...interface{}) {
+ defaultLogger.Errorf(format, args...)
+}
+
+// Fatal logs to ERROR level log. Arguments are handled in the manner of fmt.Print.
+// that all Fatal logs will exit with os.Exit(1).
+func Fatal(args ...interface{}) {
+ defaultLogger.Fatal(args...)
+}
+
+// Fatalf logs to ERROR level log. Arguments are handled in the manner of fmt.Printf.
+func Fatalf(format string, args ...interface{}) {
+ defaultLogger.Fatalf(format, args...)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/log_test.go b/tubemq-client-twins/tubemq-client-go/log/log_test.go
new file mode 100644
index 0000000..d855907
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/log_test.go
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+import (
+ "bufio"
+ "io"
+ "os"
+ "regexp"
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestDefaultLog(t *testing.T) {
+ config := defaultConfig
+ Errorf("HelloWorld")
+ f, err := os.Open(config.LogPath)
+ defer f.Close()
+ assert.Nil(t, err)
+ r := bufio.NewReader(f)
+ lines := 0
+ for {
+ content, _, err := r.ReadLine()
+ if err == io.EOF {
+ break
+ }
+ assert.Nil(t, err)
+ words := strings.Fields(string(content))
+ assert.Equal(t, len(words), 5)
+ assert.Regexp(t, regexp.MustCompile(`[\d]{4}-[\d]{2}-[\d]{2}`), words[0])
+ assert.Regexp(t, regexp.MustCompile(`[\d]{2}:[\d]{2}:[\d]{2}`), words[1])
+ assert.Equal(t, "ERROR", words[2])
+ assert.Regexp(t, regexp.MustCompile(`[\w]+/[\w]+.go:[\d]+`), words[3])
+ assert.Equal(t, "HelloWorld", words[4])
+ lines++
+ }
+ assert.Equal(t, 1, lines)
+ err = os.Remove(config.LogPath)
+ assert.Nil(t, err)
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/logger.go b/tubemq-client-twins/tubemq-client-go/log/logger.go
new file mode 100644
index 0000000..fd70ef1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/logger.go
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+// Level log level
+type Level int
+
+// log level const
+const (
+ LevelNil Level = iota
+ LevelTrace
+ LevelDebug
+ LevelInfo
+ LevelWarn
+ LevelError
+ LevelFatal
+)
+
+// LevelNames log level name map
+var LevelNames = map[string]Level{
+ "trace": LevelTrace,
+ "debug": LevelDebug,
+ "info": LevelInfo,
+ "warn": LevelWarn,
+ "error": LevelError,
+ "fatal": LevelFatal,
+}
+
+type Logger interface {
+ // Trace logs to TRACE log. Arguments are handled in the manner of fmt.Print.
+ Trace(args ...interface{})
+ // Tracef logs to TRACE log. Arguments are handled in the manner of fmt.Printf.
+ Tracef(format string, args ...interface{})
+ // Debug logs to DEBUG log. Arguments are handled in the manner of fmt.Print.
+ Debug(args ...interface{})
+ // Debugf logs to DEBUG log. Arguments are handled in the manner of fmt.Printf.
+ Debugf(format string, args ...interface{})
+ // Info logs to INFO log. Arguments are handled in the manner of fmt.Print.
+ Info(args ...interface{})
+ // Infof logs to INFO log. Arguments are handled in the manner of fmt.Printf.
+ Infof(format string, args ...interface{})
+ // Warn logs to WARNING log. Arguments are handled in the manner of fmt.Print.
+ Warn(args ...interface{})
+ // Warnf logs to WARNING log. Arguments are handled in the manner of fmt.Printf.
+ Warnf(format string, args ...interface{})
+ // Error logs to ERROR log. Arguments are handled in the manner of fmt.Print.
+ Error(args ...interface{})
+ // Errorf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
+ Errorf(format string, args ...interface{})
+ // Fatal logs to ERROR log. Arguments are handled in the manner of fmt.Print.
+ // that all Fatal logs will exit with os.Exit(1).
+ // Implementations may also call os.Exit() with a non-zero exit code.
+ Fatal(args ...interface{})
+ // Fatalf logs to ERROR log. Arguments are handled in the manner of fmt.Printf.
+ Fatalf(format string, args ...interface{})
+
+ // Sync calls the underlying Core's Sync method, flushing any buffered log entries.
+ // Applications should take care to call Sync before exiting
+ Sync() error
+
+ // // WithFields sets the user-defined data to the log and return a new Logger.
+ WithFields(fields ...string) Logger
+}
diff --git a/tubemq-client-twins/tubemq-client-go/log/warpper_zaplog.go b/tubemq-client-twins/tubemq-client-go/log/warpper_zaplog.go
new file mode 100644
index 0000000..881ddb0
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/log/warpper_zaplog.go
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package log
+
+import (
+ "fmt"
+ "time"
+
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
+ "gopkg.in/natefinch/lumberjack.v2"
+)
+
+type zapLog struct {
+ logger *zap.Logger
+}
+
+var levelToZapLevel = map[Level]zapcore.Level{
+ LevelTrace: zapcore.DebugLevel,
+ LevelDebug: zapcore.DebugLevel,
+ LevelInfo: zapcore.InfoLevel,
+ LevelWarn: zapcore.WarnLevel,
+ LevelError: zapcore.ErrorLevel,
+ LevelFatal: zapcore.FatalLevel,
+}
+
+func newZapLog(c *OutputConfig) Logger {
+ core := newFileCore(c)
+ logger := zap.New(
+ zapcore.NewTee(core),
+ zap.AddCallerSkip(2),
+ zap.AddCaller(),
+ )
+ return &zapLog{logger: logger}
+}
+
+func newFileCore(c *OutputConfig) zapcore.Core {
+ w := zapcore.AddSync(&lumberjack.Logger{
+ Filename: c.LogPath,
+ MaxSize: c.MaxSize, // megabytes
+ MaxBackups: c.MaxBackups,
+ MaxAge: c.MaxAge, // days
+ })
+
+ core := zapcore.NewCore(
+ newDefaultEncoder(),
+ w,
+ levelToZapLevel[LevelNames[c.Level]],
+ )
+ return core
+}
+
+func newDefaultEncoder() zapcore.Encoder {
+ encoderCfg := zapcore.EncoderConfig{
+ TimeKey: "T",
+ LevelKey: "L",
+ NameKey: "N",
+ CallerKey: "C",
+ MessageKey: "M",
+ StacktraceKey: "S",
+ LineEnding: zapcore.DefaultLineEnding,
+ EncodeLevel: zapcore.CapitalLevelEncoder,
+ EncodeTime: newDefaultTimeEncoder(),
+ EncodeDuration: zapcore.StringDurationEncoder,
+ EncodeCaller: zapcore.ShortCallerEncoder,
+ }
+ encoder := zapcore.NewConsoleEncoder(encoderCfg)
+ return encoder
+}
+
+func newDefaultTimeEncoder() zapcore.TimeEncoder {
+ return func(t time.Time, enc zapcore.PrimitiveArrayEncoder) {
+ enc.AppendByteString(defaultTimeFormat(t))
+ }
+}
+
+func defaultTimeFormat(t time.Time) []byte {
+ t = t.Local()
+ year, month, day := t.Date()
+ hour, minute, second := t.Clock()
+ micros := t.Nanosecond() / 1000
+
+ buf := make([]byte, 23)
+ buf[0] = byte((year/1000)%10) + '0'
+ buf[1] = byte((year/100)%10) + '0'
+ buf[2] = byte((year/10)%10) + '0'
+ buf[3] = byte(year%10) + '0'
+ buf[4] = '-'
+ buf[5] = byte((month)/10) + '0'
+ buf[6] = byte((month)%10) + '0'
+ buf[7] = '-'
+ buf[8] = byte((day)/10) + '0'
+ buf[9] = byte((day)%10) + '0'
+ buf[10] = ' '
+ buf[11] = byte((hour)/10) + '0'
+ buf[12] = byte((hour)%10) + '0'
+ buf[13] = ':'
+ buf[14] = byte((minute)/10) + '0'
+ buf[15] = byte((minute)%10) + '0'
+ buf[16] = ':'
+ buf[17] = byte((second)/10) + '0'
+ buf[18] = byte((second)%10) + '0'
+ buf[19] = '.'
+ buf[20] = byte((micros/100000)%10) + '0'
+ buf[21] = byte((micros/10000)%10) + '0'
+ buf[22] = byte((micros/1000)%10) + '0'
+ return buf
+}
+
+// ZapLogWrapper implements Logger interface
+// based on the underlying zapLog.
+type ZapLogWrapper struct {
+ l *zapLog
+}
+
+// GetLogger returns the wrapped zap looger.
+func (z *ZapLogWrapper) GetLogger() Logger {
+ return z.l
+}
+
+// Trace logs to TRACE log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Trace(args ...interface{}) {
+ z.l.Trace(args...)
+}
+
+// Tracef logs to TRACE log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Tracef(format string, args ...interface{}) {
+ z.l.Tracef(format, args...)
+}
+
+// Debug logs to DEBUG log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Debug(args ...interface{}) {
+ z.l.Debug(args...)
+}
+
+// Debugf logs to DEBUG log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Debugf(format string, args ...interface{}) {
+ z.l.Debugf(format, args...)
+}
+
+// Info logs to INFO log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Info(args ...interface{}) {
+ z.l.Info(args...)
+}
+
+// Infof logs to INFO log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Infof(format string, args ...interface{}) {
+ z.l.Infof(format, args...)
+}
+
+// Warn logs to WARNING log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Warn(args ...interface{}) {
+ z.l.Warn(args...)
+}
+
+// Warnf logs to WARNING log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Warnf(format string, args ...interface{}) {
+ z.l.Warnf(format, args...)
+}
+
+// Error logs to ERROR log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Error(args ...interface{}) {
+ z.l.Error(args...)
+}
+
+// Errorf logs to ERROR log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Errorf(format string, args ...interface{}) {
+ z.l.Errorf(format, args...)
+}
+
+// Fatal logs to FATAL log, Arguments are handled in the manner of fmt.Print
+func (z *ZapLogWrapper) Fatal(args ...interface{}) {
+ z.l.Fatal(args...)
+}
+
+// Fatalf logs to FATAL log, Arguments are handled in the manner of fmt.Printf
+func (z *ZapLogWrapper) Fatalf(format string, args ...interface{}) {
+ z.l.Fatalf(format, args...)
+}
+
+// Sync calls the zap defaultLogger's Sync method, flushing any buffered log entries.
+// Applications should take care to call Sync before exiting.
+func (z *ZapLogWrapper) Sync() error {
+ return z.l.Sync()
+}
+
+// WithFields sets the user-defined data to the log and return a new Logger.
+func (z *ZapLogWrapper) WithFields(fields ...string) Logger {
+ return z.l.WithFields(fields...)
+}
+
+// WithFields sets the user-defined data to the log and return a new Logger.
+func (l *zapLog) WithFields(fields ...string) Logger {
+ zapfields := make([]zap.Field, len(fields)/2)
+ for index := range zapfields {
+ zapfields[index] = zap.String(fields[2*index], fields[2*index+1])
+ }
+
+ return &ZapLogWrapper{l: &zapLog{logger: l.logger.With(zapfields...)}}
+}
+
+// Trace logs to TRACE log, Arguments are handled in the manner of fmt.Print.
+func (l *zapLog) Trace(args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.DebugLevel) {
+ l.logger.Debug(fmt.Sprint(args...))
+ }
+}
+
+// Tracef logs to TRACE log, Arguments are handled in the manner of fmt.Printf.
+func (l *zapLog) Tracef(format string, args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.DebugLevel) {
+ l.logger.Debug(fmt.Sprintf(format, args...))
+ }
+}
+
+// Info logs to Debug log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Debug(args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.DebugLevel) {
+ l.logger.Debug(fmt.Sprint(args...))
+ }
+}
+
+// Info logs to Debug log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Debugf(format string, args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.DebugLevel) {
+ l.logger.Debug(fmt.Sprintf(format, args...))
+ }
+}
+
+// Info logs to INFO log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Info(args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.InfoLevel) {
+ l.logger.Info(fmt.Sprint(args...))
+ }
+}
+
+// Infof logs to INFO log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Infof(format string, args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.InfoLevel) {
+ l.logger.Info(fmt.Sprintf(format, args...))
+ }
+}
+
+// Warn logs to WARNING log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Warn(args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.WarnLevel) {
+ l.logger.Warn(fmt.Sprint(args...))
+ }
+}
+
+// Warnf logs to WARNING log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Warnf(format string, args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.WarnLevel) {
+ l.logger.Warn(fmt.Sprintf(format, args...))
+ }
+}
+
+// Error logs to ERROR log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Error(args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.ErrorLevel) {
+ l.logger.Error(fmt.Sprint(args...))
+ }
+}
+
+// Errorf logs to ERROR log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Errorf(format string, args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.ErrorLevel) {
+ l.logger.Error(fmt.Sprintf(format, args...))
+ }
+}
+
+// Fatal logs to FATAL log, Arguments are handled in the manner of fmt.Print
+func (l *zapLog) Fatal(args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.FatalLevel) {
+ l.logger.Fatal(fmt.Sprint(args...))
+ }
+}
+
+// Fatalf logs to FATAL log, Arguments are handled in the manner of fmt.Printf
+func (l *zapLog) Fatalf(format string, args ...interface{}) {
+ if l.logger.Core().Enabled(zapcore.FatalLevel) {
+ l.logger.Fatal(fmt.Sprintf(format, args...))
+ }
+}
+
+// Sync calls the zap defaultLogger's Sync method, flushing any buffered log entries.
+// Applications should take care to call Sync before exiting.
+func (l *zapLog) Sync() error {
+ return l.logger.Sync()
+}