You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/10 10:32:01 UTC

[incubator-inlong] 08/12: nitpick for codec and dialopts

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 459405a5dfd214f448b05bb796baccfd76081891
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sun May 2 15:00:59 2021 +0800

    nitpick for codec and dialopts
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/codec/codec.go    | 11 ++++-------
 .../tubemq-client-go/multiplexing/multiplexing.go      | 18 +++++++-----------
 .../tubemq-client-go/multiplexing/multlplexing_test.go | 12 ++++++++++--
 3 files changed, 21 insertions(+), 20 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index b9ee1d7..fb5c945 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -30,7 +30,7 @@ import (
 const (
 	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
 	RPCMaxBufferSize      uint32 = 8192
-	frameHeadLen          uint32 = 8
+	frameHeadLen          uint32 = 12
 	maxBufferSize         int    = 128 * 1024
 	defaultMsgSize        int    = 4096
 	dataLen               uint32 = 4
@@ -82,11 +82,8 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
 	if token != RPCProtocolBeginToken {
 		return nil, errors.New("framer: read framer rpc protocol begin token not match")
 	}
-	num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen])
-	if num != int(listSizeLen) {
-		return nil, errors.New("framer: read invalid list size num")
-	}
-	listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen])
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
 	totalLen := int(frameHeadLen)
 	size := make([]byte, 4)
 	for i := 0; i < int(listSize); i++ {
@@ -119,7 +116,7 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
 	copy(data, t.msg[frameHeadLen:totalLen])
 
 	return TubeMQResponse{
-		serialNo:    binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+		serialNo:    serialNo,
 		responseBuf: data,
 	}, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index d1ee603..783c8e8 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -71,7 +71,7 @@ func NewPool() *Pool {
 // Get will return a multiplex connection
 // 1. If no underlying TCP connection has been created, a TCP connection will be created first.
 // 2. A new multiplex connection with the serialNo will be created and returned.
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) {
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) {
 	select {
 	case <-ctx.Done():
 		return nil, ctx.Err()
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 	}
 	p.connections.Store(address, c)
 
-	conn, dialOpts, err := dial(ctx, address)
+	conn, dialOpts, err := dial(ctx, address, opts)
 	c.dialOpts = dialOpts
 	if err != nil {
 		return nil, err
@@ -112,24 +112,20 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 	return c.new(ctx, serialNo)
 }
 
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
 	var timeout time.Duration
 	t, ok := ctx.Deadline()
 	if ok {
 		timeout = t.Sub(time.Now())
 	}
-	dialOpts := &DialOptions{
-		Network: "tcp",
-		Address: address,
-		Timeout: timeout,
-	}
+	opts.Timeout = timeout
 	select {
 	case <-ctx.Done():
-		return nil, dialOpts, ctx.Err()
+		return nil, opts, ctx.Err()
 	default:
 	}
-	conn, err := dialWithTimeout(dialOpts)
-	return conn, dialOpts, err
+	conn, err := dialWithTimeout(opts)
+	return conn, opts, err
 }
 
 func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index af1d416..0c6d6b9 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -93,7 +93,11 @@ func TestBasicMultiplexing(t *testing.T) {
 	defer cancel()
 
 	m := NewPool()
-	mc, err := m.Get(ctx, address, serialNo)
+	opts := &DialOptions{
+		Network: "tcp",
+		Address: address,
+	}
+	mc, err := m.Get(ctx, address, serialNo, opts)
 	body := []byte("hello world")
 
 	buf, err := Encode(serialNo, body)
@@ -118,7 +122,11 @@ func TestConcurrentMultiplexing(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 			defer cancel()
 			serialNo := atomic.AddUint32(&serialNo, 1)
-			mc, err := m.Get(ctx, address, serialNo)
+			opts := &DialOptions{
+				Network: "tcp",
+				Address: address,
+			}
+			mc, err := m.Get(ctx, address, serialNo, opts)
 			assert.Nil(t, err)
 
 			body := []byte("hello world" + strconv.Itoa(i))