You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by we...@apache.org on 2021/12/22 08:11:49 UTC

[rocketmq-client-go] branch master updated: [ISSUE #726] feat: replace fmt to rlog (#756)

This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new de5f561  [ISSUE #726] feat: replace fmt to rlog (#756)
de5f561 is described below

commit de5f56172c82d3f6633575c7904c5ab4f1c8a977
Author: yuanmoon <zh...@163.com>
AuthorDate: Wed Dec 22 16:09:52 2021 +0800

    [ISSUE #726] feat: replace fmt to rlog (#756)
    
    * Replace fmt to rlog
---
 admin/admin.go                 | 34 ++++++++++++++++--------------
 benchmark/consumer.go          | 26 +++++++++++++----------
 benchmark/main.go              | 11 +++++++---
 benchmark/producer.go          | 47 +++++++++++++++++++++++++++---------------
 benchmark/stable.go            | 24 ++++++++++++++-------
 consumer/push_consumer_test.go | 10 ++++++---
 consumer/strategy_test.go      |  8 +++++--
 docs/Introduction.md           |  4 +++-
 errors/errors.go               |  2 +-
 internal/model_test.go         | 30 ++++++++++++++++++++-------
 internal/namesrv_test.go       |  5 ++++-
 internal/remote/codec_test.go  | 20 +++++++++++++-----
 primitive/ctx.go               |  5 +++--
 primitive/nsresolver_test.go   |  5 ++++-
 rlog/log.go                    |  2 +-
 15 files changed, 154 insertions(+), 79 deletions(-)

diff --git a/admin/admin.go b/admin/admin.go
index f45f39a..1957a06 100644
--- a/admin/admin.go
+++ b/admin/admin.go
@@ -158,38 +158,40 @@ func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
 	}
 
 	if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
-		if err != nil {
-			rlog.Error("delete topic in broker error", map[string]interface{}{
-				rlog.LogKeyTopic:         cfg.Topic,
-				rlog.LogKeyBroker:        cfg.BrokerAddr,
-				rlog.LogKeyUnderlayError: err,
-			})
-		}
+		rlog.Error("delete topic in broker error", map[string]interface{}{
+			rlog.LogKeyTopic:         cfg.Topic,
+			rlog.LogKeyBroker:        cfg.BrokerAddr,
+			rlog.LogKeyUnderlayError: err,
+		})
 		return err
 	}
 
 	//delete topic in nameserver
 	if len(cfg.NameSrvAddr) == 0 {
-		a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		_, _, err := a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		if err != nil {
+			rlog.Error("delete topic in nameserver error", map[string]interface{}{
+				rlog.LogKeyTopic: cfg.Topic,
+				rlog.LogKeyUnderlayError: err,
+			})
+		}
 		cfg.NameSrvAddr = a.namesrv.AddrList()
 	}
 
 	for _, nameSrvAddr := range cfg.NameSrvAddr {
 		if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
-			if err != nil {
-				rlog.Error("delete topic in name server error", map[string]interface{}{
-					rlog.LogKeyTopic:         cfg.Topic,
-					"nameServer":             nameSrvAddr,
-					rlog.LogKeyUnderlayError: err,
-				})
-			}
+			rlog.Error("delete topic in nameserver error", map[string]interface{}{
+				"nameServer":             nameSrvAddr,
+				rlog.LogKeyTopic:         cfg.Topic,
+				rlog.LogKeyUnderlayError: err,
+			})
 			return err
 		}
 	}
 	rlog.Info("delete topic success", map[string]interface{}{
+		"nameServer":      cfg.NameSrvAddr,
 		rlog.LogKeyTopic:  cfg.Topic,
 		rlog.LogKeyBroker: cfg.BrokerAddr,
-		"nameServer":      cfg.NameSrvAddr,
 	})
 	return nil
 }
diff --git a/benchmark/consumer.go b/benchmark/consumer.go
index cada933..907a1e7 100644
--- a/benchmark/consumer.go
+++ b/benchmark/consumer.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/rocketmq-client-go/v2"
 	"github.com/apache/rocketmq-client-go/v2/consumer"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"os"
 	"os/signal"
 	"sync"
@@ -89,10 +90,13 @@ func (s *consumeSnapshots) printStati() {
 	avgS2CRT := float64(l.store2ConsumerTotalRT-f.store2ConsumerTotalRT) / respSucCount
 	s.RUnlock()
 
-	fmt.Printf(
-		"Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d\n",
-		int64(consumeTps), avgB2CRT, avgS2CRT, l.born2ConsumerMaxRT, l.store2ConsumerMaxRT,
-	)
+	rlog.Info("Benchmark Consumer Snapshot", map[string]interface{}{
+		"consumeTPS": int64(consumeTps),
+		"average(B2C)RT": avgB2CRT,
+		"average(S2C)RT": avgS2CRT,
+		"max(B2C)RT": l.born2ConsumerMaxRT,
+		"max(S2C)RT": l.store2ConsumerMaxRT,
+	})
 }
 
 type consumerBenchmark struct {
@@ -164,7 +168,7 @@ func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, e
 		return consumer.ConsumeSuccess, nil
 	})
 
-	println("Start")
+	rlog.Info("Test Start", nil)
 	c.Start()
 	select {
 	case <-exit:
@@ -176,31 +180,31 @@ func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, e
 func (bc *consumerBenchmark) run(args []string) {
 	bc.flags.Parse(args)
 	if bc.topic == "" {
-		println("empty topic")
+		rlog.Error("Empty Topic", nil)
 		bc.usage()
 		return
 	}
 
 	if bc.groupPrefix == "" {
-		println("empty group prefix")
+		rlog.Error("Empty Group Prefix", nil)
 		bc.usage()
 		return
 	}
 
 	if bc.nameSrv == "" {
-		println("empty name server")
+		rlog.Error("Empty Nameserver", nil)
 		bc.usage()
 		return
 	}
 
 	if bc.testMinutes <= 0 {
-		println("test time must be positive integer")
+		rlog.Error("Test Time Must Be Positive Integer", nil)
 		bc.usage()
 		return
 	}
 
 	if bc.instanceCount <= 0 {
-		println("thread count must be positive integer")
+		rlog.Error("Thread Count Must Be Positive Integer", nil)
 		bc.usage()
 		return
 	}
@@ -261,11 +265,11 @@ func (bc *consumerBenchmark) run(args []string) {
 	case <-signalChan:
 	}
 
-	println("Closed")
 	close(exitChan)
 	wg.Wait()
 	snapshots.takeSnapshot()
 	snapshots.printStati()
+	rlog.Info("Test Done", nil)
 }
 
 func (bc *consumerBenchmark) usage() {
diff --git a/benchmark/main.go b/benchmark/main.go
index 080a948..79eca8f 100644
--- a/benchmark/main.go
+++ b/benchmark/main.go
@@ -19,6 +19,7 @@ package main
 
 import (
 	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"os"
 )
 
@@ -45,7 +46,9 @@ func registerCommand(name string, cmd command) {
 }
 
 func usage() {
-	println(os.Args[0] + " commandName [...]")
+	rlog.Info("Command", map[string]interface{}{
+		"name": os.Args[0],
+	})
 	for _, cmd := range cmds {
 		cmd.usage()
 	}
@@ -54,7 +57,7 @@ func usage() {
 // go run *.go [command name] [command args]
 func main() {
 	if len(os.Args) < 2 {
-		println("error:lack cmd name\n")
+		rlog.Error("Lack Command Name", nil)
 		usage()
 		return
 	}
@@ -62,7 +65,9 @@ func main() {
 	name := os.Args[1]
 	cmd, ok := cmds[name]
 	if !ok {
-		fmt.Printf("command %s is not supported\n", name)
+		rlog.Error("Command Isn't Supported", map[string]interface{}{
+			"command": name,
+		})
 		usage()
 		return
 	}
diff --git a/benchmark/producer.go b/benchmark/producer.go
index 537ffbe..7516352 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -20,10 +20,10 @@ package main
 import (
 	"context"
 	"flag"
-	"fmt"
 	"github.com/apache/rocketmq-client-go/v2"
 	"github.com/apache/rocketmq-client-go/v2/primitive"
 	"github.com/apache/rocketmq-client-go/v2/producer"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"os"
 	"os/signal"
 	"sync"
@@ -91,10 +91,14 @@ func (s *produceSnapshots) printStati() {
 	maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
 	s.RUnlock()
 
-	fmt.Printf(
-		"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
-		int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
-	)
+	rlog.Info("Benchmark Producer Snapshot", map[string]interface{}{
+		"sendTps": int64(sendTps),
+		"maxRt": maxRT,
+		"averageRt": avgRT,
+		"sendFailed": l.sendRequestFailedCount,
+		"responseFailed": l.receiveResponseFailedCount,
+		"total": l.receiveResponseSuccessCount,
+	})
 }
 
 type producerBenchmark struct {
@@ -130,7 +134,9 @@ func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, e
 	)
 
 	if err != nil {
-		fmt.Printf("new producer error: %s\n", err)
+		rlog.Error("New Producer Error", map[string]interface{}{
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
 		return
 	}
 
@@ -152,7 +158,9 @@ AGAIN:
 	r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))
 
 	if err != nil {
-		fmt.Printf("send message sync error:%s", err)
+		rlog.Error("Send Message Error", map[string]interface{}{
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
 		goto AGAIN
 	}
 
@@ -170,8 +178,11 @@ AGAIN:
 		}
 		goto AGAIN
 	}
-
-	fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+	rlog.Error("Send Message Error", map[string]interface{}{
+		"topic": topic,
+		"tag": tag,
+		rlog.LogKeyUnderlayError: err.Error(),
+	})
 	goto AGAIN
 }
 
@@ -179,34 +190,34 @@ func (bp *producerBenchmark) run(args []string) {
 	bp.flags.Parse(args)
 
 	if bp.topic == "" {
-		println("empty topic")
+		rlog.Error("Empty Topic", nil)
 		bp.flags.Usage()
 		return
 	}
 
 	if bp.groupID == "" {
-		println("empty group id")
+		rlog.Error("Empty Group Id", nil)
 		bp.flags.Usage()
 		return
 	}
 
 	if bp.nameSrv == "" {
-		println("empty namesrv")
+		rlog.Error("Empty Nameserver", nil)
 		bp.flags.Usage()
 		return
 	}
 	if bp.instanceCount <= 0 {
-		println("instance count must be positive integer")
+		rlog.Error("Instance Count Must Be Positive Integer", nil)
 		bp.flags.Usage()
 		return
 	}
 	if bp.testMinutes <= 0 {
-		println("test time must be positive integer")
+		rlog.Error("Test Time Must Be Positive Integer", nil)
 		bp.flags.Usage()
 		return
 	}
 	if bp.bodySize <= 0 {
-		println("body size must be positive integer")
+		rlog.Error("Body Size Must Be Positive Integer", nil)
 		bp.flags.Usage()
 		return
 	}
@@ -221,7 +232,9 @@ func (bp *producerBenchmark) run(args []string) {
 		go func() {
 			wg.Add(1)
 			bp.produceMsg(&stati, exitChan)
-			fmt.Printf("exit of produce %d\n", i)
+			rlog.Info("Producer Done and Exit", map[string]interface{}{
+				"id": i,
+			})
 			wg.Done()
 		}()
 	}
@@ -269,7 +282,7 @@ func (bp *producerBenchmark) run(args []string) {
 	wg.Wait()
 	snapshots.takeSnapshot()
 	snapshots.printStati()
-	fmt.Println("TEST DONE")
+	rlog.Info("Test Done", nil)
 }
 
 func (bp *producerBenchmark) usage() {
diff --git a/benchmark/stable.go b/benchmark/stable.go
index cd5fb9b..2659bc5 100644
--- a/benchmark/stable.go
+++ b/benchmark/stable.go
@@ -19,8 +19,8 @@ package main
 
 import (
 	"flag"
-	"fmt"
 	"github.com/apache/rocketmq-client-go/v2/errors"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"os"
 	"os/signal"
 	"syscall"
@@ -84,11 +84,11 @@ func (st *stableTest) run() {
 		select {
 		case <-signalChan:
 			opTicker.Stop()
-			fmt.Println("test over")
+			rlog.Info("Test Done", nil)
 			return
 		case <-closeChan:
 			opTicker.Stop()
-			fmt.Println("test over")
+			rlog.Info("Test Done", nil)
 			return
 		case <-opTicker.C:
 			st.op()
@@ -127,14 +127,19 @@ func (stp *stableTestProducer) usage() {
 func (stp *stableTestProducer) run(args []string) {
 	err := stp.flags.Parse(args)
 	if err != nil {
-		fmt.Printf("parse args:%v, error:%s\n", args, err)
+		rlog.Info("Parse Args Error", map[string]interface{}{
+			"args": args,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
 		stp.usage()
 		return
 	}
 
 	err = stp.checkFlag()
 	if err != nil {
-		fmt.Println(err)
+		rlog.Error("Check Flag Error", map[string]interface{}{
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
 		stp.usage()
 		return
 	}
@@ -199,15 +204,20 @@ func (stc *stableTestConsumer) usage() {
 func (stc *stableTestConsumer) run(args []string) {
 	err := stc.flags.Parse(args)
 	if err != nil {
-		fmt.Printf("parse args:%v, error:%s\n", args, err)
+		rlog.Error("Parse Args Error", map[string]interface{}{
+			"args": args,
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
 		stc.usage()
 		return
 	}
 
 	err = stc.checkFlag()
 	if err != nil {
+		rlog.Error("Check Flag Error", map[string]interface{}{
+			rlog.LogKeyUnderlayError: err.Error(),
+		})
 		stc.usage()
-		fmt.Printf("%s\n", err)
 		return
 	}
 	//
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index e67b2db..78bc1f7 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -19,7 +19,7 @@ package consumer
 
 import (
 	"context"
-	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"testing"
 
 	"github.com/apache/rocketmq-client-go/v2/internal"
@@ -48,7 +48,9 @@ func TestStart(t *testing.T) {
 
 		err := c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
 			msgs ...*primitive.MessageExt) (ConsumeResult, error) {
-			fmt.Printf("subscribe callback: %v \n", msgs)
+			rlog.Info("Subscribe Callback", map[string]interface{}{
+				"msgs": msgs,
+			})
 			return ConsumeSuccess, nil
 		})
 
@@ -62,7 +64,9 @@ func TestStart(t *testing.T) {
 
 		err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
 			msgs ...*primitive.MessageExt) (ConsumeResult, error) {
-			fmt.Printf("subscribe callback: %v \n", msgs)
+			rlog.Info("Subscribe Callback", map[string]interface{}{
+				"msgs": msgs,
+			})
 			return ConsumeSuccess, nil
 		})
 
diff --git a/consumer/strategy_test.go b/consumer/strategy_test.go
index e66b15c..d521b4b 100644
--- a/consumer/strategy_test.go
+++ b/consumer/strategy_test.go
@@ -18,7 +18,7 @@ limitations under the License.
 package consumer
 
 import (
-	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"testing"
 
 	"github.com/apache/rocketmq-client-go/v2/primitive"
@@ -476,7 +476,11 @@ func TestAllocateByConsistentHash(t *testing.T) {
 		Convey("observe the result of AllocateByMachineRoom", func() {
 			for _, value := range cases {
 				result := strategy("testGroup", value.currentCid, value.mqAll, value.cidAll)
-				fmt.Printf("\n\n currentCid:%s, cidAll:%s, \n allocateResult:%+v \n", value.currentCid, value.cidAll, result)
+				rlog.Info("Result Of AllocateByMachineRoom", map[string]interface{}{
+					"currentCid": value.currentCid,
+					"cidAll": value.cidAll,
+					"allocateResult": result,
+				})
 			}
 		})
 	})
diff --git a/docs/Introduction.md b/docs/Introduction.md
index a4e954e..011603a 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -91,7 +91,9 @@ c, err := rocketmq.NewPushConsumer(
 ```
 err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
     msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) {
-    fmt.Printf("subscribe callback: %v \n", msgs)
+    rlog.Info("Subscribe Callback", map[string]interface{}{
+        "msgs": msgs,
+    })
     return consumer.ConsumeSuccess, nil
 })
 ```
diff --git a/errors/errors.go b/errors/errors.go
index 43b49ca..195984e 100644
--- a/errors/errors.go
+++ b/errors/errors.go
@@ -20,7 +20,7 @@ package errors
 import "errors"
 
 var (
-	ErrRequestTimeout    = errors.New("equest timeout")
+	ErrRequestTimeout    = errors.New("request timeout")
 	ErrMQEmpty           = errors.New("MessageQueue is nil")
 	ErrOffset            = errors.New("offset < 0")
 	ErrNumbers           = errors.New("numbers < 0")
diff --git a/internal/model_test.go b/internal/model_test.go
index 56eeb24..a505f3e 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -19,7 +19,7 @@ package internal
 
 import (
 	"encoding/json"
-	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"strings"
 	"testing"
 
@@ -46,7 +46,9 @@ func TestHeartbeatData(t *testing.T) {
 
 			v, err := json.Marshal(set)
 			So(err, ShouldBeNil)
-			fmt.Printf("json producer set: %s", string(v))
+			rlog.Info("Json Producer", map[string]interface{}{
+				"result": string(v),
+			})
 		})
 
 		Convey("producer heatbeat", func() {
@@ -64,7 +66,9 @@ func TestHeartbeatData(t *testing.T) {
 
 			v, err := json.Marshal(hbt)
 			So(err, ShouldBeNil)
-			fmt.Printf("json producer: %s\n", string(v))
+			rlog.Info("Json Producer", map[string]interface{}{
+				"result": string(v),
+			})
 		})
 
 		Convey("consumer heartbeat", func() {
@@ -81,7 +85,9 @@ func TestHeartbeatData(t *testing.T) {
 
 			v, err := json.Marshal(hbt)
 			So(err, ShouldBeNil)
-			fmt.Printf("json consumer: %s\n", string(v))
+			rlog.Info("Json Consumer", map[string]interface{}{
+				"result": string(v),
+			})
 		})
 
 		Convey("producer & consumer heartbeat", func() {
@@ -109,7 +115,9 @@ func TestHeartbeatData(t *testing.T) {
 
 			v, err := json.Marshal(hbt)
 			So(err, ShouldBeNil)
-			fmt.Printf("json producer & consumer: %s\n", string(v))
+			rlog.Info("Json Producer and Consumer", map[string]interface{}{
+				"result": string(v),
+			})
 		})
 	})
 
@@ -374,7 +382,9 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
 			consumeMessageDirectlyResult.ConsumeResult = ConsumeSuccess
 			data, err := consumeMessageDirectlyResult.Encode()
 			So(err, ShouldBeNil)
-			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+			rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{
+				"result": string(data),
+			})
 		})
 
 		Convey("test consume timeout", func() {
@@ -386,7 +396,9 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
 			consumeResult.ConsumeResult = ReturnNull
 			data, err := consumeResult.Encode()
 			So(err, ShouldBeNil)
-			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+			rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{
+				"result": string(data),
+			})
 		})
 
 		Convey("test consume exception", func() {
@@ -399,7 +411,9 @@ func TestConsumeMessageDirectlyResult_MarshalJSON(t *testing.T) {
 			consumeResult.Remark = "Unknown Exception"
 			data, err := consumeResult.Encode()
 			So(err, ShouldBeNil)
-			fmt.Printf("json consumeMessageDirectlyResult: %s\n", string(data))
+			rlog.Info("Json consumeMessageDirectlyResult", map[string]interface{}{
+				"result": string(data),
+			})
 		})
 	})
 }
diff --git a/internal/namesrv_test.go b/internal/namesrv_test.go
index e58dc29..b047a07 100644
--- a/internal/namesrv_test.go
+++ b/internal/namesrv_test.go
@@ -19,6 +19,7 @@ package internal
 
 import (
 	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"net"
 	"net/http"
 	"os"
@@ -91,7 +92,9 @@ func TestUpdateNameServerAddress(t *testing.T) {
 
 		port := listener.Addr().(*net.TCPAddr).Port
 		nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs", port)
-		fmt.Println("temporary name server domain: ", nameServerDommain)
+		rlog.Info("Temporary Nameserver", map[string]interface{}{
+			"domain": nameServerDommain,
+		})
 
 		resolver := primitive.NewHttpResolver("DEFAULT", nameServerDommain)
 		ns := &namesrvs{
diff --git a/internal/remote/codec_test.go b/internal/remote/codec_test.go
index 8fb8a60..0717451 100644
--- a/internal/remote/codec_test.go
+++ b/internal/remote/codec_test.go
@@ -18,7 +18,7 @@ package remote
 
 import (
 	"encoding/json"
-	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"math/rand"
 	"reflect"
 	"testing"
@@ -350,19 +350,29 @@ func TestCommandJsonIter(t *testing.T) {
 	cmd := NewRemotingCommand(192, h, []byte("Hello RocketMQCodecs"))
 	cmdData, err := json.Marshal(cmd)
 	assert.Nil(t, err)
-	fmt.Printf("cmd data from json: %v\n", *(*string)(unsafe.Pointer(&cmdData)))
+	rlog.Info("Command Data From Json", map[string]interface{}{
+		"data": *(*string)(unsafe.Pointer(&cmdData)),
+	})
 
 	data, err := jsoniter.Marshal(cmd)
 	assert.Nil(t, err)
-	fmt.Printf("cmd data from jsoniter: %v\n", *(*string)(unsafe.Pointer(&data)))
+	rlog.Info("Command Data From Jsoniter", map[string]interface{}{
+		"data": *(*string)(unsafe.Pointer(&data)),
+	})
 
 	var cmdResp RemotingCommand
 	err = json.Unmarshal(cmdData, &cmdResp)
 	assert.Nil(t, err)
-	fmt.Printf("cmd: %#v language: %v\n", cmdResp, cmdResp.Language)
+	rlog.Info("Json Decode Success", map[string]interface{}{
+		"cmd": cmdResp,
+		"language": cmdResp.Language,
+	})
 
 	var cmdResp2 RemotingCommand
 	err = json.Unmarshal(data, &cmdResp2)
 	assert.Nil(t, err)
-	fmt.Printf("cmd: %#v language: %v\n", cmdResp2, cmdResp2.Language)
+	rlog.Info("Json Decode Success", map[string]interface{}{
+		"cmd": cmdResp2,
+		"language": cmdResp2.Language,
+	})
 }
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 4481dcd..936b54c 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -22,7 +22,6 @@ package primitive
 
 import (
 	"context"
-	"fmt"
 	"math"
 
 	"github.com/apache/rocketmq-client-go/v2/rlog"
@@ -47,7 +46,9 @@ func (c ConsumeReturnType) Ordinal() int {
 	case FailedReturn:
 		return 4
 	default:
-		rlog.Error(fmt.Sprintf("illegal ConsumeReturnType: %v", c), nil)
+		rlog.Error("Illegal Consumer Return Type", map[string]interface{}{
+			"type": c,
+		})
 		return 0
 	}
 }
diff --git a/primitive/nsresolver_test.go b/primitive/nsresolver_test.go
index 98d839a..d42d2c6 100644
--- a/primitive/nsresolver_test.go
+++ b/primitive/nsresolver_test.go
@@ -18,6 +18,7 @@ package primitive
 
 import (
 	"fmt"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
 	"io/ioutil"
 	"net"
 	"net/http"
@@ -65,7 +66,9 @@ func TestHttpResolverWithGet(t *testing.T) {
 
 		port := listener.Addr().(*net.TCPAddr).Port
 		nameServerDommain := fmt.Sprintf("http://127.0.0.1:%d/nameserver/addrs2", port)
-		fmt.Println("temporary name server domain: ", nameServerDommain)
+		rlog.Info("Temporary Nameserver", map[string]interface{}{
+			"domain": nameServerDommain,
+		})
 
 		resolver := NewHttpResolver("DEFAULT", nameServerDommain)
 		resolver.Resolve()
diff --git a/rlog/log.go b/rlog/log.go
index 382f5aa..037cfcf 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -97,7 +97,7 @@ func (l *defaultLogger) Error(msg string, fields map[string]interface{}) {
 	if msg == "" && len(fields) == 0 {
 		return
 	}
-	l.logger.WithFields(fields).WithFields(fields).Error(msg)
+	l.logger.WithFields(fields).Error(msg)
 }
 
 func (l *defaultLogger) Fatal(msg string, fields map[string]interface{}) {