You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/10 10:32:02 UTC
[incubator-inlong] 09/12: Address review comments
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 6b64b52905cba23c4f6b7fe892f69f81b7cc6032
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 6 10:44:51 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/codec.go | 119 +--------------------
.../tubemq-client-go/multiplexing/multiplexing.go | 16 +--
.../multiplexing/multlplexing_test.go | 4 +-
3 files changed, 15 insertions(+), 124 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index fb5c945..4af8b5d 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -20,125 +20,16 @@
// will need to be changed.
package codec
-import (
- "bufio"
- "encoding/binary"
- "errors"
- "io"
-)
-
-const (
- RPCProtocolBeginToken uint32 = 0xFF7FF4FE
- RPCMaxBufferSize uint32 = 8192
- frameHeadLen uint32 = 12
- maxBufferSize int = 128 * 1024
- defaultMsgSize int = 4096
- dataLen uint32 = 4
- listSizeLen uint32 = 4
- serialNoLen uint32 = 4
- beginTokenLen uint32 = 4
-)
-
-// TransportResponse is the abstraction of the transport response.
-type TransportResponse interface {
+// Response is the abstraction of the transport response.
+type Response interface {
// GetSerialNo returns the `serialNo` of the corresponding request.
GetSerialNo() uint32
- // GetResponseBuf returns the body of the response.
- GetResponseBuf() []byte
+ // GetBuffer returns the body of the response.
+ GetBuffer() []byte
}
// Decoder is the abstraction of the decoder which is used to decode the response.
type Decoder interface {
// Decode will decode the response to frame head and body.
- Decode() (TransportResponse, error)
-}
-
-// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
-type TubeMQDecoder struct {
- reader io.Reader
- msg []byte
-}
-
-// New will return a default TubeMQDecoder.
-func New(reader io.Reader) *TubeMQDecoder {
- bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
- return &TubeMQDecoder{
- msg: make([]byte, defaultMsgSize),
- reader: bufferReader,
- }
-}
-
-// Decode will decode the response from TubeMQ to TransportResponse according to
-// the RPC protocol of TubeMQ.
-func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
- num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
- if err != nil {
- return nil, err
- }
- if num != int(frameHeadLen) {
- return nil, errors.New("framer: read frame header num invalid")
- }
- token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
- if token != RPCProtocolBeginToken {
- return nil, errors.New("framer: read framer rpc protocol begin token not match")
- }
- serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
- listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
- totalLen := int(frameHeadLen)
- size := make([]byte, 4)
- for i := 0; i < int(listSize); i++ {
- n, err := io.ReadFull(t.reader, size)
- if err != nil {
- return nil, err
- }
- if n != int(dataLen) {
- return nil, errors.New("framer: read invalid size")
- }
-
- s := int(binary.BigEndian.Uint32(size))
- if totalLen+s > len(t.msg) {
- data := t.msg[:totalLen]
- t.msg = make([]byte, totalLen+s)
- copy(t.msg, data[:])
- }
-
- num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
- if err != nil {
- return nil, err
- }
- if num != s {
- return nil, errors.New("framer: read invalid data")
- }
- totalLen += s
- }
-
- data := make([]byte, totalLen-int(frameHeadLen))
- copy(data, t.msg[frameHeadLen:totalLen])
-
- return TubeMQResponse{
- serialNo: serialNo,
- responseBuf: data,
- }, nil
-}
-
-// TubeMQRequest is the implementation of TubeMQ request.
-type TubeMQRequest struct {
- serialNo uint32
- req []byte
-}
-
-// TubeMQResponse is the TubeMQ implementation of TransportResponse.
-type TubeMQResponse struct {
- serialNo uint32
- responseBuf []byte
-}
-
-// GetSerialNo will return the SerialNo of TubeMQResponse.
-func (t TubeMQResponse) GetSerialNo() uint32 {
- return t.serialNo
-}
-
-// GetResponseBuf will return the body of TubeMQResponse.
-func (t TubeMQResponse) GetResponseBuf() []byte {
- return t.responseBuf
+ Decode() (Response, error)
}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 783c8e8..825636a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
}
p.connections.Store(address, c)
- conn, dialOpts, err := dial(ctx, address, opts)
+ conn, dialOpts, err := dial(ctx, opts)
c.dialOpts = dialOpts
if err != nil {
return nil, err
@@ -112,7 +112,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
return c.new(ctx, serialNo)
}
-func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, opts *DialOptions) (net.Conn, *DialOptions, error) {
var timeout time.Duration
t, ok := ctx.Deadline()
if ok {
@@ -177,7 +177,7 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) {
type recvReader struct {
ctx context.Context
- recv chan codec.TransportResponse
+ recv chan codec.Response
}
// MultiplexConnection is used to multiplex a TCP connection.
@@ -199,7 +199,7 @@ func (mc *MultiplexConnection) Write(b []byte) error {
}
// Read returns the response from the multiplex connection.
-func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+func (mc *MultiplexConnection) Read() (codec.Response, error) {
select {
case <-mc.reader.ctx.Done():
mc.conn.remove(mc.serialNo)
@@ -217,7 +217,7 @@ func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
}
}
-func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp codec.Response) {
mc.reader.recv <- rsp
mc.conn.remove(rsp.GetSerialNo())
}
@@ -264,7 +264,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnec
done: c.mDone,
reader: &recvReader{
ctx: ctx,
- recv: make(chan codec.TransportResponse, 1),
+ recv: make(chan codec.Response, 1),
},
}
@@ -324,7 +324,7 @@ func (c *Connection) reconnect() error {
}
// The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
+// 1. Read from the connection and decode it to the Response.
// 2. Send the response to the corresponding multiplex connection based on the serialNo.
func (c *Connection) reader() {
var lastErr error
@@ -347,7 +347,7 @@ func (c *Connection) reader() {
continue
}
mc.reader.recv <- rsp
- mc.conn.remove(rsp.GetSerialNo())
+ mc.conn.remove(serialNo)
}
c.close(lastErr, c.done)
}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 0c6d6b9..fbeb8c4 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -107,7 +107,7 @@ func TestBasicMultiplexing(t *testing.T) {
rsp, err := mc.Read()
assert.Nil(t, err)
assert.Equal(t, serialNo, rsp.GetSerialNo())
- assert.Equal(t, body, rsp.GetResponseBuf())
+ assert.Equal(t, body, rsp.GetBuffer())
assert.Equal(t, mc.Write(nil), nil)
}
@@ -137,7 +137,7 @@ func TestConcurrentMultiplexing(t *testing.T) {
rsp, err := mc.Read()
assert.Nil(t, err)
assert.Equal(t, serialNo, rsp.GetSerialNo())
- assert.Equal(t, body, rsp.GetResponseBuf())
+ assert.Equal(t, body, rsp.GetBuffer())
}(i)
}
wg.Wait()