You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/14 13:19:31 UTC

[rocketmq-client-go] branch native updated: - remove RemotingClient interface (#43)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 4bbb156  - remove RemotingClient interface (#43)
4bbb156 is described below

commit 4bbb1569ffa92274fc860c6efc23a8fd000c122b
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Thu Mar 14 21:19:27 2019 +0800

    - remove RemotingClient interface (#43)
    
    - add InvokeSync, InvokeAsync, InvokeOneWay and ScanResponseTable expose methods.
    - use sync.Map data structure
    - implement LocalIP method
    - fix string format bug in consumer.go
---
 common/init.go        |  26 -------
 consumer.go           |   2 +-
 kernel/client.go      |   2 +-
 kernel/message.go     |   2 +-
 remote/client.go      | 186 ++++++++++++++++----------------------------------
 remote/client_test.go |  80 +++++-----------------
 utils/helper.go       |  80 +++++++++++++++++++++-
 utils/helper_test.go  |  41 +++++++++++
 8 files changed, 194 insertions(+), 225 deletions(-)

diff --git a/common/init.go b/common/init.go
deleted file mode 100644
index 1285295..0000000
--- a/common/init.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-package common
-
-import "github.com/apache/rocketmq-client-go/remote"
-
-var client remote.RemotingClient
-
-
-func init(){
-	client = remote.NewDefaultRemotingClient()
-}
\ No newline at end of file
diff --git a/consumer.go b/consumer.go
index 896df74..acb8f1d 100644
--- a/consumer.go
+++ b/consumer.go
@@ -158,7 +158,7 @@ func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, dat
 
 func (c *defaultConsumer) makeSureStateOK() error {
 	if c.state != kernel.Running {
-		return fmt.Errorf("the consumer state is [%s], not running", c.state)
+		return fmt.Errorf("the consumer state is [%d], not running", c.state)
 	}
 	return nil
 }
diff --git a/kernel/client.go b/kernel/client.go
index b8b6eee..6e4808a 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -43,7 +43,7 @@ var (
 	persistConsumerOffsetInterval = 5 * time.Second
 	unitMode                      = false
 	vipChannelEnabled, _          = strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
-	clientID                      = clientIP + "@" + instanceName
+	clientID                      = string(clientIP) + "@" + instanceName
 )
 
 var (
diff --git a/kernel/message.go b/kernel/message.go
index d65c48f..843a743 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -110,7 +110,7 @@ func (msgExt *MessageExt) GetTags() string {
 }
 
 func (msgExt *MessageExt) String() string {
-	return fmt.Sprint("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
+	return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
 		"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
 		"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.QueueId,
 		msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, msgExt.BornTimestamp, msgExt.BornHost,
diff --git a/remote/client.go b/remote/client.go
index 095c304..2cdd6de 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -19,7 +19,6 @@ package remote
 import (
 	"bufio"
 	"bytes"
-	"context"
 	"encoding/binary"
 	"errors"
 	"io"
@@ -31,6 +30,7 @@ import (
 var (
 	//ErrRequestTimeout for request timeout error
 	ErrRequestTimeout = errors.New("request timeout")
+	connectionLocker sync.Mutex
 )
 
 //ResponseFuture for
@@ -84,87 +84,14 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
 	}
 }
 
-//RemotingClient includes basic operations for remote
-type RemotingClient interface {
-	Start()
-	Shutdown()
-	InvokeSync(string, *RemotingCommand, time.Duration) (*RemotingCommand, error)
-	InvokeAsync(string, *RemotingCommand, time.Duration, func(*ResponseFuture)) error
-	InvokeOneWay(string, *RemotingCommand) error
-}
-
-func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
-	return nil, nil
-}
-
-func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f func(*RemotingCommand)) error {
-	return nil
-}
-
-func InvokeOneWay(addr string, request*RemotingCommand) error {
-	return nil
-}
-
-//defaultRemotingClient for default RemotingClient implementation
-type defaultRemotingClient struct {
-	responseTable    map[int32]*ResponseFuture
-	responseLock     sync.RWMutex
-	connectionsTable map[string]net.Conn
-	connectionLock   sync.RWMutex
-	ctx              context.Context
-	cancel           context.CancelFunc
-}
-
-//NewDefaultRemotingClient for
-func NewDefaultRemotingClient() RemotingClient {
-	client := &defaultRemotingClient{
-		responseTable:    make(map[int32]*ResponseFuture, 0),
-		connectionsTable: make(map[string]net.Conn, 0),
-	}
-	ctx, cancel := context.WithCancel(context.Background())
-	client.ctx = ctx
-	client.cancel = cancel
-	return client
-}
-
-//Start begin sca
-func (client *defaultRemotingClient) Start() {
-	ticker := time.NewTicker(1 * time.Second)
-	go func() {
-		for {
-			select {
-			case <-ticker.C:
-				client.scanResponseTable()
-			case <-client.ctx.Done():
-				ticker.Stop()
-				return
-			}
-		}
-	}()
-}
-
-// Shutdown for call client.cancel
-func (client *defaultRemotingClient) Shutdown() {
-	client.cancel()
-	client.connectionLock.Lock()
-	for addr, conn := range client.connectionsTable {
-		conn.Close()
-		delete(client.connectionsTable, addr)
-	}
-	client.connectionLock.Unlock()
-}
-
-// InvokeSync sends request synchronously
-func (client *defaultRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
-	conn, err := client.connect(addr)
+func InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
+	conn, err := connect(addr)
 	if err != nil {
 		return nil, err
 	}
 	resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
-	client.responseLock.Lock()
-	client.responseTable[resp.Opaque] = resp
-	client.responseLock.Unlock()
-	err = client.sendRequest(conn, request)
+	responseTable.Store(resp.Opaque, resp)
+	err = sendRequest(conn, request)
 	if err != nil {
 		return nil, err
 	}
@@ -172,53 +99,57 @@ func (client *defaultRemotingClient) InvokeSync(addr string, request *RemotingCo
 	return resp.waitResponse()
 }
 
-//InvokeAsync send request asynchronously
-func (client *defaultRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
-	conn, err := client.connect(addr)
+func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
+	conn, err := connect(addr)
 	if err != nil {
 		return err
 	}
 	resp := NewResponseFuture(request.Opaque, timeoutMillis, callback)
-	client.responseLock.Lock()
-	client.responseTable[resp.Opaque] = resp
-	client.responseLock.Unlock()
-	err = client.sendRequest(conn, request)
+	responseTable.Store(resp.Opaque, resp)
+	err = sendRequest(conn, request)
 	if err != nil {
 		return err
 	}
 	resp.SendRequestOK = true
 	return nil
+
 }
 
-//InvokeOneWay send one-way request
-func (client *defaultRemotingClient) InvokeOneWay(addr string, request *RemotingCommand) error {
-	conn, err := client.connect(addr)
+func InvokeOneWay(addr string, request *RemotingCommand) error {
+	conn, err := connect(addr)
 	if err != nil {
 		return err
 	}
-	return client.sendRequest(conn, request)
+	return sendRequest(conn, request)
 }
 
-func (client *defaultRemotingClient) scanResponseTable() {
+var (
+	responseTable   sync.Map
+	connectionTable sync.Map
+)
+
+func ScanResponseTable() {
 	rfs := make([]*ResponseFuture, 0)
-	client.responseLock.Lock()
-	for opaque, resp := range client.responseTable {
-		if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 {
-			delete(client.responseTable, opaque)
-			rfs = append(rfs, resp)
+	responseTable.Range(func(key, value interface{}) bool {
+		if resp, ok := value.(*ResponseFuture); ok {
+			if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 {
+				rfs = append(rfs, resp)
+				responseTable.Delete(key)
+			}
 		}
-	}
-	client.responseLock.Unlock()
+		return true
+	})
 	for _, rf := range rfs {
 		rf.Err = ErrRequestTimeout
 		rf.executeInvokeCallback()
 	}
 }
 
-func (client *defaultRemotingClient) connect(addr string) (net.Conn, error) {
-	client.connectionLock.Lock()
-	defer client.connectionLock.Unlock()
-	conn, ok := client.connectionsTable[addr]
+func connect(addr string) (net.Conn, error) {
+	//it needs additional locker.
+	connectionLocker.Lock()
+	defer connectionLocker.Unlock()
+	conn, ok := connectionTable.Load(addr)
 	if ok {
 		return conn.(net.Conn), nil
 	}
@@ -226,30 +157,31 @@ func (client *defaultRemotingClient) connect(addr string) (net.Conn, error) {
 	if err != nil {
 		return nil, err
 	}
-	client.connectionsTable[addr] = tcpConn
-	go client.receiveResponse(tcpConn)
+	connectionTable.Store(addr, tcpConn)
+	go receiveResponse(tcpConn)
 	return tcpConn, nil
+
 }
 
-func (client *defaultRemotingClient) receiveResponse(conn net.Conn) {
-	scanner := createScanner(conn)
+func receiveResponse(r net.Conn) {
+	scanner := createScanner(r)
 	for scanner.Scan() {
 		receivedRemotingCommand, err := decode(scanner.Bytes())
 		if err != nil {
-			client.closeConnection(conn)
+			closeConnection(r)
 			break
 		}
 		if receivedRemotingCommand.isResponseType() {
-			client.responseLock.Lock()
-			if resp, ok := client.responseTable[receivedRemotingCommand.Opaque]; ok {
-				delete(client.responseTable, receivedRemotingCommand.Opaque)
-				resp.ResponseCommand = receivedRemotingCommand
-				resp.executeInvokeCallback()
-				if resp.Done != nil {
-					resp.Done <- true
+			resp, ok := responseTable.Load(receivedRemotingCommand.Opaque)
+			if ok {
+				responseTable.Delete(receivedRemotingCommand.Opaque)
+				responseFuture := resp.(*ResponseFuture)
+				responseFuture.ResponseCommand = receivedRemotingCommand
+				responseFuture.executeInvokeCallback()
+				if responseFuture.Done != nil {
+					responseFuture.Done <- true
 				}
 			}
-			client.responseLock.Unlock()
 		} else {
 			// todo handler request from peer
 		}
@@ -273,31 +205,27 @@ func createScanner(r io.Reader) *bufio.Scanner {
 	return scanner
 }
 
-func (client *defaultRemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
+func sendRequest(conn net.Conn, request *RemotingCommand) error {
 	content, err := encode(request)
 	if err != nil {
 		return err
 	}
 	_, err = conn.Write(content)
 	if err != nil {
-		client.closeConnection(conn)
+		closeConnection(conn)
 		return err
 	}
 	return nil
 }
 
-func (client *defaultRemotingClient) closeConnection(toCloseConn net.Conn) {
-	client.connectionLock.Lock()
-	var toCloseAddr string
-	for addr, con := range client.connectionsTable {
-		if con == toCloseConn {
-			toCloseAddr = addr
-			break
+func closeConnection(toCloseConn net.Conn) {
+	connectionTable.Range(func(key, value interface{}) bool {
+		if value == toCloseConn {
+			connectionTable.Delete(key)
+			return false
+		} else {
+			return true
 		}
-	}
-	if conn, ok := client.connectionsTable[toCloseAddr]; ok {
-		delete(client.connectionsTable, toCloseAddr)
-		conn.Close()
-	}
-	client.connectionLock.Unlock()
+
+	})
 }
diff --git a/remote/client_test.go b/remote/client_test.go
index 40f6e50..0a05953 100644
--- a/remote/client_test.go
+++ b/remote/client_test.go
@@ -123,43 +123,6 @@ func TestResponseFutureWaitResponse(t *testing.T) {
 	}
 }
 
-func TestNewDefaultRemotingClient(t *testing.T) {
-	r := NewDefaultRemotingClient()
-	d, ok := r.(*defaultRemotingClient)
-	if !ok {
-		t.Errorf("defaultRemotingClient does not implement RemotingClient interface")
-	}
-	if len(d.responseTable) != 0 {
-		t.Errorf("wrong responseTable size. want=%d, got=%d",
-			0, len(d.responseTable))
-	}
-	if len(d.connectionsTable) != 0 {
-		t.Errorf("wrong connectionsTable size. want=%d, got=%d",
-			0, len(d.connectionsTable))
-	}
-	if d.ctx == nil {
-		t.Errorf("wrong ctx. want=%v, got=<nil>", d.ctx)
-	}
-	if d.cancel == nil {
-		t.Errorf("wrong cancel. want=%v, got=<nil>", d.cancel)
-	}
-}
-
-func TestDefaultRemotingClient_Start_ShutDown(t *testing.T) {
-	r := NewDefaultRemotingClient()
-	d, ok := r.(*defaultRemotingClient)
-	if !ok {
-		t.Errorf("defaultRemotingClient does not implement RemotingClient interface")
-	}
-	d.Start()
-	time.Sleep(2 * time.Second)
-	d.Shutdown()
-	if len(d.connectionsTable) != 0 {
-		t.Errorf("wrong connectionTable. want=%d, got=%d",
-			0, len(d.connectionsTable))
-	}
-}
-
 func TestCreateScanner(t *testing.T) {
 	r := randomNewRemotingCommand()
 	content, err := encode(r)
@@ -179,19 +142,15 @@ func TestCreateScanner(t *testing.T) {
 	}
 }
 
-func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
+func TestInvokeSync(t *testing.T) {
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
 	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
 	serverSendRemotingCommand.Flag = ResponseType
-
-	client := NewDefaultRemotingClient()
-	client.Start()
-	defer client.Shutdown()
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
-		receiveCommand, err := client.InvokeSync(":3000",
+		receiveCommand, err := InvokeSync(":3000",
 			clientSendRemtingCommand, time.Duration(1000))
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
@@ -221,7 +180,7 @@ func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
 				t.Errorf("failed to decode RemotingCommnad. %s", err)
 			}
 			if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
-				t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+				t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
 					clientSendRemtingCommand.Code)
 			}
 			body, err := encode(serverSendRemotingCommand)
@@ -238,26 +197,23 @@ func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
 	wg.Done()
 }
 
-func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
+func TestInvokeAsync(t *testing.T) {
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
 	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
 	serverSendRemotingCommand.Flag = ResponseType
 
-	client := NewDefaultRemotingClient()
-	client.Start()
-	defer client.Shutdown()
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
-		err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+		err := InvokeAsync(":3000", clientSendRemtingCommand,
 			time.Duration(1000), func(r *ResponseFuture) {
-			if string(r.ResponseCommand.Body) != "Welcome native" {
-				t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
-					"Welcome native",  string(r.ResponseCommand.Body))
-			}
-			wg.Done()
-		})
+				if string(r.ResponseCommand.Body) != "Welcome native" {
+					t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
+						"Welcome native", string(r.ResponseCommand.Body))
+				}
+				wg.Done()
+			})
 		if err != nil {
 			t.Errorf("failed to invokeSync. %s", err)
 		}
@@ -282,7 +238,7 @@ func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
 				t.Errorf("failed to decode RemotingCommnad. %s", err)
 			}
 			if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
-				t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+				t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
 					clientSendRemtingCommand.Code)
 			}
 			body, err := encode(serverSendRemotingCommand)
@@ -299,17 +255,13 @@ func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
 	wg.Done()
 }
 
-
-func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
+func TestInvokeOneWay(t *testing.T) {
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 
-	client := NewDefaultRemotingClient()
-	client.Start()
-	defer client.Shutdown()
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
-		err := client.InvokeOneWay(":3000", clientSendRemtingCommand)
+		err := InvokeOneWay(":3000", clientSendRemtingCommand)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		}
@@ -334,11 +286,11 @@ func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
 				t.Errorf("failed to decode RemotingCommnad. %s", err)
 			}
 			if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
-				t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+				t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
 					clientSendRemtingCommand.Code)
 			}
 			return
 		}
 	}
 	wg.Done()
-}
\ No newline at end of file
+}
diff --git a/utils/helper.go b/utils/helper.go
index fe3dd62..bc18c26 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -17,10 +17,84 @@ limitations under the License.
 
 package utils
 
-import "hash/crc32"
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"hash/crc32"
+	"net"
+	"os"
+	"sync"
+	"time"
+)
 
-func LocalIP() string {
-	return ""
+var (
+	counter        int16 = 0
+	startTimestamp int64 = 0
+	nextTimestamp  int64 = 0
+	prefix  string
+	locker         sync.Mutex
+)
+
+func MessageClientID() string {
+	locker.Lock()
+	defer locker.Unlock()
+	if prefix == "" {
+		buf := new(bytes.Buffer)
+		binary.Write(buf, binary.BigEndian, LocalIP())
+		binary.Write(buf, binary.BigEndian, Pid())
+		binary.Write(buf, binary.BigEndian, ClassLoaderID())
+		prefix = fmt.Sprintf("%x", buf.Bytes())
+	}
+	if time.Now().Unix() > nextTimestamp {
+		updateTimestamp()
+	}
+	counter++
+	buf := new(bytes.Buffer)
+	binary.Write(buf, binary.BigEndian, int32((time.Now().Unix()-startTimestamp)*1000))
+	binary.Write(buf, binary.BigEndian, counter)
+	return prefix + fmt.Sprintf("%x", buf.Bytes())
+
+}
+
+func updateTimestamp() {
+	year, month := time.Now().Year(), time.Now().Month()
+	startTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix())
+	nextTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0).Unix())
+}
+
+func LocalIP() []byte {
+	ip, err := clientIP4()
+	if err != nil {
+		return []byte{0, 0, 0, 0}
+	}
+	return ip
+}
+
+func clientIP4() ([]byte, error) {
+	addrs, err := net.InterfaceAddrs()
+	if err != nil {
+		return nil, errors.New("unexpected IP address")
+	}
+	for _, addr := range addrs {
+		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+			if ip4 := ipnet.IP.To4(); ip4!=nil{
+				return ip4, nil
+			}
+		}
+	}
+	return nil, errors.New("unknown IP address")
+}
+
+
+
+func Pid() int16 {
+	return int16(os.Getpid())
+}
+
+func ClassLoaderID() int32 {
+	return 0
 }
 
 // HashString hashes a string to a unique hashcode.
diff --git a/utils/helper_test.go b/utils/helper_test.go
new file mode 100644
index 0000000..113e92c
--- /dev/null
+++ b/utils/helper_test.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package utils
+
+import "testing"
+
+func TestClassLoaderID(t *testing.T) {
+	if ClassLoaderID() != 0 {
+		t.Errorf("wrong ClassLoaderID, want=%d, got=%d", 0, ClassLoaderID())
+	}
+}
+
+func TestLocalIP(t *testing.T) {
+	ip := LocalIP()
+	if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
+		t.Errorf("failed to get host public ip4 address")
+	}else{
+		t.Logf("ip4 address: %v", ip)
+	}
+}
+
+
+func BenchmarkMessageClientID(b *testing.B) {
+	for i:= 0; i< b.N;i++{
+		 MessageClientID()
+	}
+}
\ No newline at end of file