You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/10/21 14:54:06 UTC

[rocketmq-client-go] branch native updated: feat: reduce receive memory usage (#255)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new bd44991  feat: reduce receive memory usage (#255)
bd44991 is described below

commit bd44991de7932c94f682c9d46ac82cbe6183e0ee
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Oct 21 22:54:00 2019 +0800

    feat: reduce receive memory usage (#255)
    
    - use io.reader instead of scanner
    
    Closes #252
---
 go.mod                           |  2 +
 internal/remote/remote_client.go | 99 +++++++++++++++++++++++++---------------
 2 files changed, 64 insertions(+), 37 deletions(-)

diff --git a/go.mod b/go.mod
index 8436b35..716c4d2 100644
--- a/go.mod
+++ b/go.mod
@@ -14,3 +14,5 @@ require (
 )
 
 replace stathat.com/c/consistent v1.0.0 => github.com/stathat/consistent v1.0.0
+
+go 1.13
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 96f67a6..703e958 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -136,50 +136,75 @@ func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, er
 }
 
 func (c *remotingClient) receiveResponse(r net.Conn) {
-	scanner := c.createScanner(r)
-	for scanner.Scan() {
-		cmd, err := decode(scanner.Bytes())
+	var err error
+	header := make([]byte, 4)
+	for {
 		if err != nil {
+			rlog.Errorf("conn err: %s so close", err.Error())
 			c.closeConnection(r)
-			rlog.Errorf("decode RemotingCommand error: %s", err.Error())
 			break
 		}
-		if cmd.isResponseType() {
-			resp, exist := c.responseTable.Load(cmd.Opaque)
-			if exist {
-				c.responseTable.Delete(cmd.Opaque)
-				responseFuture := resp.(*ResponseFuture)
-				go func() {
-					responseFuture.ResponseCommand = cmd
-					responseFuture.executeInvokeCallback()
-					if responseFuture.Done != nil {
-						responseFuture.Done <- true
-					}
-				}()
-			}
-		} else {
-			f := c.processors[cmd.Code]
-			if f != nil {
-				go func() { // 单个goroutine会造成死锁
-					res := f(cmd, r.RemoteAddr())
-					if res != nil {
-						res.Opaque = cmd.Opaque
-						res.Flag |= 1 << 0
-						err := c.sendRequest(r, res)
-						if err != nil {
-							rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
-						}
-					}
-				}()
-			} else {
-				rlog.Warnf("receive broker's requests, but no func to handle, code is: %d", cmd.Code)
-			}
+
+		_, err = io.ReadFull(r, header)
+		if err != nil {
+			rlog.Errorf("io readfull error: %s", err.Error())
+			continue
 		}
+
+		var length int32
+		err = binary.Read(bytes.NewReader(header), binary.BigEndian, &length)
+		if err != nil {
+			rlog.Errorf("binary decode header: %s", err.Error())
+			continue
+		}
+
+		buf := make([]byte, length)
+		_, err = io.ReadFull(r, buf)
+		if err != nil {
+			rlog.Errorf("io readfull error: %s", err.Error())
+			continue
+		}
+
+		cmd, err := decode(buf)
+		if err != nil {
+			rlog.Errorf("decode RemotingCommand error: %s", err.Error())
+			continue
+		}
+		c.processCMD(cmd, r)
 	}
-	if scanner.Err() != nil {
-		rlog.Errorf("net: %s scanner exit, Err: %s.", r.RemoteAddr().String(), scanner.Err())
+}
+
+func (c *remotingClient) processCMD(cmd *RemotingCommand, r net.Conn) {
+	if cmd.isResponseType() {
+		resp, exist := c.responseTable.Load(cmd.Opaque)
+		if exist {
+			c.responseTable.Delete(cmd.Opaque)
+			responseFuture := resp.(*ResponseFuture)
+			go func() {
+				responseFuture.ResponseCommand = cmd
+				responseFuture.executeInvokeCallback()
+				if responseFuture.Done != nil {
+					responseFuture.Done <- true
+				}
+			}()
+		}
 	} else {
-		rlog.Infof("net: %s scanner exit.", r.RemoteAddr().String())
+		f := c.processors[cmd.Code]
+		if f != nil {
+			go func() { // 单个goroutine会造成死锁
+				res := f(cmd, r.RemoteAddr())
+				if res != nil {
+					res.Opaque = cmd.Opaque
+					res.Flag |= 1 << 0
+					err := c.sendRequest(r, res)
+					if err != nil {
+						rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
+					}
+				}
+			}()
+		} else {
+			rlog.Warnf("receive broker's requests, but no func to handle, code is: %d", cmd.Code)
+		}
 	}
 }