You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/10 10:32:02 UTC

[incubator-inlong] 09/12: Address review comments

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 6b64b52905cba23c4f6b7fe892f69f81b7cc6032
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 6 10:44:51 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/codec.go                | 119 +--------------------
 .../tubemq-client-go/multiplexing/multiplexing.go  |  16 +--
 .../multiplexing/multlplexing_test.go              |   4 +-
 3 files changed, 15 insertions(+), 124 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index fb5c945..4af8b5d 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -20,125 +20,16 @@
 // will need to be changed.
 package codec
 
-import (
-	"bufio"
-	"encoding/binary"
-	"errors"
-	"io"
-)
-
-const (
-	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
-	RPCMaxBufferSize      uint32 = 8192
-	frameHeadLen          uint32 = 12
-	maxBufferSize         int    = 128 * 1024
-	defaultMsgSize        int    = 4096
-	dataLen               uint32 = 4
-	listSizeLen           uint32 = 4
-	serialNoLen           uint32 = 4
-	beginTokenLen         uint32 = 4
-)
-
-// TransportResponse is the abstraction of the transport response.
-type TransportResponse interface {
+// Response is the abstraction of the transport response.
+type Response interface {
 	// GetSerialNo returns the `serialNo` of the corresponding request.
 	GetSerialNo() uint32
-	// GetResponseBuf returns the body of the response.
-	GetResponseBuf() []byte
+	// GetBuffer returns the body of the response.
+	GetBuffer() []byte
 }
 
 // Decoder is the abstraction of the decoder which is used to decode the response.
 type Decoder interface {
 	// Decode will decode the response to frame head and body.
-	Decode() (TransportResponse, error)
-}
-
-// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
-type TubeMQDecoder struct {
-	reader io.Reader
-	msg    []byte
-}
-
-// New will return a default TubeMQDecoder.
-func New(reader io.Reader) *TubeMQDecoder {
-	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
-	return &TubeMQDecoder{
-		msg:    make([]byte, defaultMsgSize),
-		reader: bufferReader,
-	}
-}
-
-// Decode will decode the response from TubeMQ to TransportResponse according to
-// the RPC protocol of TubeMQ.
-func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
-	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
-	if err != nil {
-		return nil, err
-	}
-	if num != int(frameHeadLen) {
-		return nil, errors.New("framer: read frame header num invalid")
-	}
-	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
-	if token != RPCProtocolBeginToken {
-		return nil, errors.New("framer: read framer rpc protocol begin token not match")
-	}
-	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
-	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
-	totalLen := int(frameHeadLen)
-	size := make([]byte, 4)
-	for i := 0; i < int(listSize); i++ {
-		n, err := io.ReadFull(t.reader, size)
-		if err != nil {
-			return nil, err
-		}
-		if n != int(dataLen) {
-			return nil, errors.New("framer: read invalid size")
-		}
-
-		s := int(binary.BigEndian.Uint32(size))
-		if totalLen+s > len(t.msg) {
-			data := t.msg[:totalLen]
-			t.msg = make([]byte, totalLen+s)
-			copy(t.msg, data[:])
-		}
-
-		num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
-		if err != nil {
-			return nil, err
-		}
-		if num != s {
-			return nil, errors.New("framer: read invalid data")
-		}
-		totalLen += s
-	}
-
-	data := make([]byte, totalLen-int(frameHeadLen))
-	copy(data, t.msg[frameHeadLen:totalLen])
-
-	return TubeMQResponse{
-		serialNo:    serialNo,
-		responseBuf: data,
-	}, nil
-}
-
-// TubeMQRequest is the implementation of TubeMQ request.
-type TubeMQRequest struct {
-	serialNo uint32
-	req      []byte
-}
-
-// TubeMQResponse is the TubeMQ implementation of TransportResponse.
-type TubeMQResponse struct {
-	serialNo    uint32
-	responseBuf []byte
-}
-
-// GetSerialNo will return the SerialNo of TubeMQResponse.
-func (t TubeMQResponse) GetSerialNo() uint32 {
-	return t.serialNo
-}
-
-// GetResponseBuf will return the body of TubeMQResponse.
-func (t TubeMQResponse) GetResponseBuf() []byte {
-	return t.responseBuf
+	Decode() (Response, error)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 783c8e8..825636a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
 	}
 	p.connections.Store(address, c)
 
-	conn, dialOpts, err := dial(ctx, address, opts)
+	conn, dialOpts, err := dial(ctx, opts)
 	c.dialOpts = dialOpts
 	if err != nil {
 		return nil, err
@@ -112,7 +112,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
 	return c.new(ctx, serialNo)
 }
 
-func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, opts *DialOptions) (net.Conn, *DialOptions, error) {
 	var timeout time.Duration
 	t, ok := ctx.Deadline()
 	if ok {
@@ -177,7 +177,7 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) {
 
 type recvReader struct {
 	ctx  context.Context
-	recv chan codec.TransportResponse
+	recv chan codec.Response
 }
 
 // MultiplexConnection is used to multiplex a TCP connection.
@@ -199,7 +199,7 @@ func (mc *MultiplexConnection) Write(b []byte) error {
 }
 
 // Read returns the response from the multiplex connection.
-func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+func (mc *MultiplexConnection) Read() (codec.Response, error) {
 	select {
 	case <-mc.reader.ctx.Done():
 		mc.conn.remove(mc.serialNo)
@@ -217,7 +217,7 @@ func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
 	}
 }
 
-func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp codec.Response) {
 	mc.reader.recv <- rsp
 	mc.conn.remove(rsp.GetSerialNo())
 }
@@ -264,7 +264,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnec
 		done:     c.mDone,
 		reader: &recvReader{
 			ctx:  ctx,
-			recv: make(chan codec.TransportResponse, 1),
+			recv: make(chan codec.Response, 1),
 		},
 	}
 
@@ -324,7 +324,7 @@ func (c *Connection) reconnect() error {
 }
 
 // The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
+// 1. Read from the connection and decode it to the Response.
 // 2. Send the response to the corresponding multiplex connection based on the serialNo.
 func (c *Connection) reader() {
 	var lastErr error
@@ -347,7 +347,7 @@ func (c *Connection) reader() {
 			continue
 		}
 		mc.reader.recv <- rsp
-		mc.conn.remove(rsp.GetSerialNo())
+		mc.conn.remove(serialNo)
 	}
 	c.close(lastErr, c.done)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 0c6d6b9..fbeb8c4 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -107,7 +107,7 @@ func TestBasicMultiplexing(t *testing.T) {
 	rsp, err := mc.Read()
 	assert.Nil(t, err)
 	assert.Equal(t, serialNo, rsp.GetSerialNo())
-	assert.Equal(t, body, rsp.GetResponseBuf())
+	assert.Equal(t, body, rsp.GetBuffer())
 	assert.Equal(t, mc.Write(nil), nil)
 }
 
@@ -137,7 +137,7 @@ func TestConcurrentMultiplexing(t *testing.T) {
 			rsp, err := mc.Read()
 			assert.Nil(t, err)
 			assert.Equal(t, serialNo, rsp.GetSerialNo())
-			assert.Equal(t, body, rsp.GetResponseBuf())
+			assert.Equal(t, body, rsp.GetBuffer())
 		}(i)
 	}
 	wg.Wait()