You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/10/21 14:54:06 UTC
[rocketmq-client-go] branch native updated: feat: reduce receive
memory usage (#255)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new bd44991 feat: reduce receive memory usage (#255)
bd44991 is described below
commit bd44991de7932c94f682c9d46ac82cbe6183e0ee
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Oct 21 22:54:00 2019 +0800
feat: reduce receive memory usage (#255)
- use io.reader instead of scanner
Closes #252
---
go.mod | 2 +
internal/remote/remote_client.go | 99 +++++++++++++++++++++++++---------------
2 files changed, 64 insertions(+), 37 deletions(-)
diff --git a/go.mod b/go.mod
index 8436b35..716c4d2 100644
--- a/go.mod
+++ b/go.mod
@@ -14,3 +14,5 @@ require (
)
replace stathat.com/c/consistent v1.0.0 => github.com/stathat/consistent v1.0.0
+
+go 1.13
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 96f67a6..703e958 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -136,50 +136,75 @@ func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, er
}
func (c *remotingClient) receiveResponse(r net.Conn) {
- scanner := c.createScanner(r)
- for scanner.Scan() {
- cmd, err := decode(scanner.Bytes())
+ var err error
+ header := make([]byte, 4)
+ for {
if err != nil {
+ rlog.Errorf("conn err: %s so close", err.Error())
c.closeConnection(r)
- rlog.Errorf("decode RemotingCommand error: %s", err.Error())
break
}
- if cmd.isResponseType() {
- resp, exist := c.responseTable.Load(cmd.Opaque)
- if exist {
- c.responseTable.Delete(cmd.Opaque)
- responseFuture := resp.(*ResponseFuture)
- go func() {
- responseFuture.ResponseCommand = cmd
- responseFuture.executeInvokeCallback()
- if responseFuture.Done != nil {
- responseFuture.Done <- true
- }
- }()
- }
- } else {
- f := c.processors[cmd.Code]
- if f != nil {
- go func() { // 单个goroutine会造成死锁
- res := f(cmd, r.RemoteAddr())
- if res != nil {
- res.Opaque = cmd.Opaque
- res.Flag |= 1 << 0
- err := c.sendRequest(r, res)
- if err != nil {
- rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
- }
- }
- }()
- } else {
- rlog.Warnf("receive broker's requests, but no func to handle, code is: %d", cmd.Code)
- }
+
+ _, err = io.ReadFull(r, header)
+ if err != nil {
+ rlog.Errorf("io readfull error: %s", err.Error())
+ continue
}
+
+ var length int32
+ err = binary.Read(bytes.NewReader(header), binary.BigEndian, &length)
+ if err != nil {
+ rlog.Errorf("binary decode header: %s", err.Error())
+ continue
+ }
+
+ buf := make([]byte, length)
+ _, err = io.ReadFull(r, buf)
+ if err != nil {
+ rlog.Errorf("io readfull error: %s", err.Error())
+ continue
+ }
+
+ cmd, err := decode(buf)
+ if err != nil {
+ rlog.Errorf("decode RemotingCommand error: %s", err.Error())
+ continue
+ }
+ c.processCMD(cmd, r)
}
- if scanner.Err() != nil {
- rlog.Errorf("net: %s scanner exit, Err: %s.", r.RemoteAddr().String(), scanner.Err())
+}
+
+func (c *remotingClient) processCMD(cmd *RemotingCommand, r net.Conn) {
+ if cmd.isResponseType() {
+ resp, exist := c.responseTable.Load(cmd.Opaque)
+ if exist {
+ c.responseTable.Delete(cmd.Opaque)
+ responseFuture := resp.(*ResponseFuture)
+ go func() {
+ responseFuture.ResponseCommand = cmd
+ responseFuture.executeInvokeCallback()
+ if responseFuture.Done != nil {
+ responseFuture.Done <- true
+ }
+ }()
+ }
} else {
- rlog.Infof("net: %s scanner exit.", r.RemoteAddr().String())
+ f := c.processors[cmd.Code]
+ if f != nil {
+ go func() { // 单个goroutine会造成死锁
+ res := f(cmd, r.RemoteAddr())
+ if res != nil {
+ res.Opaque = cmd.Opaque
+ res.Flag |= 1 << 0
+ err := c.sendRequest(r, res)
+ if err != nil {
+ rlog.Warnf("send response to broker error: %s, type is: %d", err, res.Code)
+ }
+ }
+ }()
+ } else {
+ rlog.Warnf("receive broker's requests, but no func to handle, code is: %d", cmd.Code)
+ }
}
}