You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/03/14 13:19:31 UTC
[rocketmq-client-go] branch native updated: - remove RemotingClient
interface (#43)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 4bbb156 - remove RemotingClient interface (#43)
4bbb156 is described below
commit 4bbb1569ffa92274fc860c6efc23a8fd000c122b
Author: 高峰 <ga...@foxmail.com>
AuthorDate: Thu Mar 14 21:19:27 2019 +0800
- remove RemotingClient interface (#43)
- add InvokeSync, InvokeAsync, InvokeOneWay and ScanResponseTable expose methods.
- use sync.Map data structure
- implement LocalIP method
- fix string format bug in consumer.go
---
common/init.go | 26 -------
consumer.go | 2 +-
kernel/client.go | 2 +-
kernel/message.go | 2 +-
remote/client.go | 186 ++++++++++++++++----------------------------------
remote/client_test.go | 80 +++++-----------------
utils/helper.go | 80 +++++++++++++++++++++-
utils/helper_test.go | 41 +++++++++++
8 files changed, 194 insertions(+), 225 deletions(-)
diff --git a/common/init.go b/common/init.go
deleted file mode 100644
index 1285295..0000000
--- a/common/init.go
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-package common
-
-import "github.com/apache/rocketmq-client-go/remote"
-
-var client remote.RemotingClient
-
-
-func init(){
- client = remote.NewDefaultRemotingClient()
-}
\ No newline at end of file
diff --git a/consumer.go b/consumer.go
index 896df74..acb8f1d 100644
--- a/consumer.go
+++ b/consumer.go
@@ -158,7 +158,7 @@ func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, dat
func (c *defaultConsumer) makeSureStateOK() error {
if c.state != kernel.Running {
- return fmt.Errorf("the consumer state is [%s], not running", c.state)
+ return fmt.Errorf("the consumer state is [%d], not running", c.state)
}
return nil
}
diff --git a/kernel/client.go b/kernel/client.go
index b8b6eee..6e4808a 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -43,7 +43,7 @@ var (
persistConsumerOffsetInterval = 5 * time.Second
unitMode = false
vipChannelEnabled, _ = strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
- clientID = clientIP + "@" + instanceName
+ clientID = string(clientIP) + "@" + instanceName
)
var (
diff --git a/kernel/message.go b/kernel/message.go
index d65c48f..843a743 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -110,7 +110,7 @@ func (msgExt *MessageExt) GetTags() string {
}
func (msgExt *MessageExt) String() string {
- return fmt.Sprint("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
+ return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
"ReconsumeTimes=%d, PreparedTransactionOffset=%d]", msgExt.Message.String(), msgExt.MsgId, msgExt.QueueId,
msgExt.StoreSize, msgExt.QueueOffset, msgExt.SysFlag, msgExt.BornTimestamp, msgExt.BornHost,
diff --git a/remote/client.go b/remote/client.go
index 095c304..2cdd6de 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -19,7 +19,6 @@ package remote
import (
"bufio"
"bytes"
- "context"
"encoding/binary"
"errors"
"io"
@@ -31,6 +30,7 @@ import (
var (
//ErrRequestTimeout for request timeout error
ErrRequestTimeout = errors.New("request timeout")
+ connectionLocker sync.Mutex
)
//ResponseFuture for
@@ -84,87 +84,14 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
}
}
-//RemotingClient includes basic operations for remote
-type RemotingClient interface {
- Start()
- Shutdown()
- InvokeSync(string, *RemotingCommand, time.Duration) (*RemotingCommand, error)
- InvokeAsync(string, *RemotingCommand, time.Duration, func(*ResponseFuture)) error
- InvokeOneWay(string, *RemotingCommand) error
-}
-
-func InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
- return nil, nil
-}
-
-func InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, f func(*RemotingCommand)) error {
- return nil
-}
-
-func InvokeOneWay(addr string, request*RemotingCommand) error {
- return nil
-}
-
-//defaultRemotingClient for default RemotingClient implementation
-type defaultRemotingClient struct {
- responseTable map[int32]*ResponseFuture
- responseLock sync.RWMutex
- connectionsTable map[string]net.Conn
- connectionLock sync.RWMutex
- ctx context.Context
- cancel context.CancelFunc
-}
-
-//NewDefaultRemotingClient for
-func NewDefaultRemotingClient() RemotingClient {
- client := &defaultRemotingClient{
- responseTable: make(map[int32]*ResponseFuture, 0),
- connectionsTable: make(map[string]net.Conn, 0),
- }
- ctx, cancel := context.WithCancel(context.Background())
- client.ctx = ctx
- client.cancel = cancel
- return client
-}
-
-//Start begin sca
-func (client *defaultRemotingClient) Start() {
- ticker := time.NewTicker(1 * time.Second)
- go func() {
- for {
- select {
- case <-ticker.C:
- client.scanResponseTable()
- case <-client.ctx.Done():
- ticker.Stop()
- return
- }
- }
- }()
-}
-
-// Shutdown for call client.cancel
-func (client *defaultRemotingClient) Shutdown() {
- client.cancel()
- client.connectionLock.Lock()
- for addr, conn := range client.connectionsTable {
- conn.Close()
- delete(client.connectionsTable, addr)
- }
- client.connectionLock.Unlock()
-}
-
-// InvokeSync sends request synchronously
-func (client *defaultRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
- conn, err := client.connect(addr)
+func InvokeSync(addr string, request *RemotingCommand, timeoutMillis time.Duration) (*RemotingCommand, error) {
+ conn, err := connect(addr)
if err != nil {
return nil, err
}
resp := NewResponseFuture(request.Opaque, timeoutMillis, nil)
- client.responseLock.Lock()
- client.responseTable[resp.Opaque] = resp
- client.responseLock.Unlock()
- err = client.sendRequest(conn, request)
+ responseTable.Store(resp.Opaque, resp)
+ err = sendRequest(conn, request)
if err != nil {
return nil, err
}
@@ -172,53 +99,57 @@ func (client *defaultRemotingClient) InvokeSync(addr string, request *RemotingCo
return resp.waitResponse()
}
-//InvokeAsync send request asynchronously
-func (client *defaultRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
- conn, err := client.connect(addr)
+func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis time.Duration, callback func(*ResponseFuture)) error {
+ conn, err := connect(addr)
if err != nil {
return err
}
resp := NewResponseFuture(request.Opaque, timeoutMillis, callback)
- client.responseLock.Lock()
- client.responseTable[resp.Opaque] = resp
- client.responseLock.Unlock()
- err = client.sendRequest(conn, request)
+ responseTable.Store(resp.Opaque, resp)
+ err = sendRequest(conn, request)
if err != nil {
return err
}
resp.SendRequestOK = true
return nil
+
}
-//InvokeOneWay send one-way request
-func (client *defaultRemotingClient) InvokeOneWay(addr string, request *RemotingCommand) error {
- conn, err := client.connect(addr)
+func InvokeOneWay(addr string, request *RemotingCommand) error {
+ conn, err := connect(addr)
if err != nil {
return err
}
- return client.sendRequest(conn, request)
+ return sendRequest(conn, request)
}
-func (client *defaultRemotingClient) scanResponseTable() {
+var (
+ responseTable sync.Map
+ connectionTable sync.Map
+)
+
+func ScanResponseTable() {
rfs := make([]*ResponseFuture, 0)
- client.responseLock.Lock()
- for opaque, resp := range client.responseTable {
- if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 {
- delete(client.responseTable, opaque)
- rfs = append(rfs, resp)
+ responseTable.Range(func(key, value interface{}) bool {
+ if resp, ok := value.(*ResponseFuture); ok {
+ if (resp.BeginTimestamp + int64(resp.TimeoutMillis) + 1000) <= time.Now().Unix()*1000 {
+ rfs = append(rfs, resp)
+ responseTable.Delete(key)
+ }
}
- }
- client.responseLock.Unlock()
+ return true
+ })
for _, rf := range rfs {
rf.Err = ErrRequestTimeout
rf.executeInvokeCallback()
}
}
-func (client *defaultRemotingClient) connect(addr string) (net.Conn, error) {
- client.connectionLock.Lock()
- defer client.connectionLock.Unlock()
- conn, ok := client.connectionsTable[addr]
+func connect(addr string) (net.Conn, error) {
+ //it needs additional locker.
+ connectionLocker.Lock()
+ defer connectionLocker.Unlock()
+ conn, ok := connectionTable.Load(addr)
if ok {
return conn.(net.Conn), nil
}
@@ -226,30 +157,31 @@ func (client *defaultRemotingClient) connect(addr string) (net.Conn, error) {
if err != nil {
return nil, err
}
- client.connectionsTable[addr] = tcpConn
- go client.receiveResponse(tcpConn)
+ connectionTable.Store(addr, tcpConn)
+ go receiveResponse(tcpConn)
return tcpConn, nil
+
}
-func (client *defaultRemotingClient) receiveResponse(conn net.Conn) {
- scanner := createScanner(conn)
+func receiveResponse(r net.Conn) {
+ scanner := createScanner(r)
for scanner.Scan() {
receivedRemotingCommand, err := decode(scanner.Bytes())
if err != nil {
- client.closeConnection(conn)
+ closeConnection(r)
break
}
if receivedRemotingCommand.isResponseType() {
- client.responseLock.Lock()
- if resp, ok := client.responseTable[receivedRemotingCommand.Opaque]; ok {
- delete(client.responseTable, receivedRemotingCommand.Opaque)
- resp.ResponseCommand = receivedRemotingCommand
- resp.executeInvokeCallback()
- if resp.Done != nil {
- resp.Done <- true
+ resp, ok := responseTable.Load(receivedRemotingCommand.Opaque)
+ if ok {
+ responseTable.Delete(receivedRemotingCommand.Opaque)
+ responseFuture := resp.(*ResponseFuture)
+ responseFuture.ResponseCommand = receivedRemotingCommand
+ responseFuture.executeInvokeCallback()
+ if responseFuture.Done != nil {
+ responseFuture.Done <- true
}
}
- client.responseLock.Unlock()
} else {
// todo handler request from peer
}
@@ -273,31 +205,27 @@ func createScanner(r io.Reader) *bufio.Scanner {
return scanner
}
-func (client *defaultRemotingClient) sendRequest(conn net.Conn, request *RemotingCommand) error {
+func sendRequest(conn net.Conn, request *RemotingCommand) error {
content, err := encode(request)
if err != nil {
return err
}
_, err = conn.Write(content)
if err != nil {
- client.closeConnection(conn)
+ closeConnection(conn)
return err
}
return nil
}
-func (client *defaultRemotingClient) closeConnection(toCloseConn net.Conn) {
- client.connectionLock.Lock()
- var toCloseAddr string
- for addr, con := range client.connectionsTable {
- if con == toCloseConn {
- toCloseAddr = addr
- break
+func closeConnection(toCloseConn net.Conn) {
+ connectionTable.Range(func(key, value interface{}) bool {
+ if value == toCloseConn {
+ connectionTable.Delete(key)
+ return false
+ } else {
+ return true
}
- }
- if conn, ok := client.connectionsTable[toCloseAddr]; ok {
- delete(client.connectionsTable, toCloseAddr)
- conn.Close()
- }
- client.connectionLock.Unlock()
+
+ })
}
diff --git a/remote/client_test.go b/remote/client_test.go
index 40f6e50..0a05953 100644
--- a/remote/client_test.go
+++ b/remote/client_test.go
@@ -123,43 +123,6 @@ func TestResponseFutureWaitResponse(t *testing.T) {
}
}
-func TestNewDefaultRemotingClient(t *testing.T) {
- r := NewDefaultRemotingClient()
- d, ok := r.(*defaultRemotingClient)
- if !ok {
- t.Errorf("defaultRemotingClient does not implement RemotingClient interface")
- }
- if len(d.responseTable) != 0 {
- t.Errorf("wrong responseTable size. want=%d, got=%d",
- 0, len(d.responseTable))
- }
- if len(d.connectionsTable) != 0 {
- t.Errorf("wrong connectionsTable size. want=%d, got=%d",
- 0, len(d.connectionsTable))
- }
- if d.ctx == nil {
- t.Errorf("wrong ctx. want=%v, got=<nil>", d.ctx)
- }
- if d.cancel == nil {
- t.Errorf("wrong cancel. want=%v, got=<nil>", d.cancel)
- }
-}
-
-func TestDefaultRemotingClient_Start_ShutDown(t *testing.T) {
- r := NewDefaultRemotingClient()
- d, ok := r.(*defaultRemotingClient)
- if !ok {
- t.Errorf("defaultRemotingClient does not implement RemotingClient interface")
- }
- d.Start()
- time.Sleep(2 * time.Second)
- d.Shutdown()
- if len(d.connectionsTable) != 0 {
- t.Errorf("wrong connectionTable. want=%d, got=%d",
- 0, len(d.connectionsTable))
- }
-}
-
func TestCreateScanner(t *testing.T) {
r := randomNewRemotingCommand()
content, err := encode(r)
@@ -179,19 +142,15 @@ func TestCreateScanner(t *testing.T) {
}
}
-func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
+func TestInvokeSync(t *testing.T) {
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
serverSendRemotingCommand.Flag = ResponseType
-
- client := NewDefaultRemotingClient()
- client.Start()
- defer client.Shutdown()
var wg sync.WaitGroup
wg.Add(1)
go func() {
- receiveCommand, err := client.InvokeSync(":3000",
+ receiveCommand, err := InvokeSync(":3000",
clientSendRemtingCommand, time.Duration(1000))
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
@@ -221,7 +180,7 @@ func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
t.Errorf("failed to decode RemotingCommnad. %s", err)
}
if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
- t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+ t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
}
body, err := encode(serverSendRemotingCommand)
@@ -238,26 +197,23 @@ func TestDefaultRemotingClient_InvokeSync(t *testing.T) {
wg.Done()
}
-func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
+func TestInvokeAsync(t *testing.T) {
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
serverSendRemotingCommand.Flag = ResponseType
- client := NewDefaultRemotingClient()
- client.Start()
- defer client.Shutdown()
var wg sync.WaitGroup
wg.Add(1)
go func() {
- err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+ err := InvokeAsync(":3000", clientSendRemtingCommand,
time.Duration(1000), func(r *ResponseFuture) {
- if string(r.ResponseCommand.Body) != "Welcome native" {
- t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
- "Welcome native", string(r.ResponseCommand.Body))
- }
- wg.Done()
- })
+ if string(r.ResponseCommand.Body) != "Welcome native" {
+ t.Errorf("wrong responseCommand.Body. want=%s, got=%s",
+ "Welcome native", string(r.ResponseCommand.Body))
+ }
+ wg.Done()
+ })
if err != nil {
t.Errorf("failed to invokeSync. %s", err)
}
@@ -282,7 +238,7 @@ func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
t.Errorf("failed to decode RemotingCommnad. %s", err)
}
if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
- t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+ t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
}
body, err := encode(serverSendRemotingCommand)
@@ -299,17 +255,13 @@ func TestDefaultRemotingClient_InvokeAsync(t *testing.T) {
wg.Done()
}
-
-func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
+func TestInvokeOneWay(t *testing.T) {
clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
- client := NewDefaultRemotingClient()
- client.Start()
- defer client.Shutdown()
var wg sync.WaitGroup
wg.Add(1)
go func() {
- err := client.InvokeOneWay(":3000", clientSendRemtingCommand)
+ err := InvokeOneWay(":3000", clientSendRemtingCommand)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
@@ -334,11 +286,11 @@ func TestDefaultRemotingClient_InvokeOneWay(t *testing.T) {
t.Errorf("failed to decode RemotingCommnad. %s", err)
}
if clientSendRemtingCommand.Code != receivedRemotingCommand.Code {
- t.Errorf("wrong code. want=%d, got=%d",receivedRemotingCommand.Code,
+ t.Errorf("wrong code. want=%d, got=%d", receivedRemotingCommand.Code,
clientSendRemtingCommand.Code)
}
return
}
}
wg.Done()
-}
\ No newline at end of file
+}
diff --git a/utils/helper.go b/utils/helper.go
index fe3dd62..bc18c26 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -17,10 +17,84 @@ limitations under the License.
package utils
-import "hash/crc32"
+import (
+ "bytes"
+ "encoding/binary"
+ "errors"
+ "fmt"
+ "hash/crc32"
+ "net"
+ "os"
+ "sync"
+ "time"
+)
-func LocalIP() string {
- return ""
+var (
+ counter int16 = 0
+ startTimestamp int64 = 0
+ nextTimestamp int64 = 0
+ prefix string
+ locker sync.Mutex
+)
+
+func MessageClientID() string {
+ locker.Lock()
+ defer locker.Unlock()
+ if prefix == "" {
+ buf := new(bytes.Buffer)
+ binary.Write(buf, binary.BigEndian, LocalIP())
+ binary.Write(buf, binary.BigEndian, Pid())
+ binary.Write(buf, binary.BigEndian, ClassLoaderID())
+ prefix = fmt.Sprintf("%x", buf.Bytes())
+ }
+ if time.Now().Unix() > nextTimestamp {
+ updateTimestamp()
+ }
+ counter++
+ buf := new(bytes.Buffer)
+ binary.Write(buf, binary.BigEndian, int32((time.Now().Unix()-startTimestamp)*1000))
+ binary.Write(buf, binary.BigEndian, counter)
+ return prefix + fmt.Sprintf("%x", buf.Bytes())
+
+}
+
+func updateTimestamp() {
+ year, month := time.Now().Year(), time.Now().Month()
+ startTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix())
+ nextTimestamp = int64(time.Date(year, month, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0).Unix())
+}
+
+func LocalIP() []byte {
+ ip, err := clientIP4()
+ if err != nil {
+ return []byte{0, 0, 0, 0}
+ }
+ return ip
+}
+
+func clientIP4() ([]byte, error) {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return nil, errors.New("unexpected IP address")
+ }
+ for _, addr := range addrs {
+ if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ip4 := ipnet.IP.To4(); ip4!=nil{
+ return ip4, nil
+ }
+ }
+ }
+ return nil, errors.New("unknown IP address")
+}
+
+
+
+func Pid() int16 {
+ return int16(os.Getpid())
+}
+
+func ClassLoaderID() int32 {
+ return 0
}
// HashString hashes a string to a unique hashcode.
diff --git a/utils/helper_test.go b/utils/helper_test.go
new file mode 100644
index 0000000..113e92c
--- /dev/null
+++ b/utils/helper_test.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package utils
+
+import "testing"
+
+func TestClassLoaderID(t *testing.T) {
+ if ClassLoaderID() != 0 {
+ t.Errorf("wrong ClassLoaderID, want=%d, got=%d", 0, ClassLoaderID())
+ }
+}
+
+func TestLocalIP(t *testing.T) {
+ ip := LocalIP()
+ if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
+ t.Errorf("failed to get host public ip4 address")
+ }else{
+ t.Logf("ip4 address: %v", ip)
+ }
+}
+
+
+func BenchmarkMessageClientID(b *testing.B) {
+ for i:= 0; i< b.N;i++{
+ MessageClientID()
+ }
+}
\ No newline at end of file