You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@inlong.apache.org by GitBox <gi...@apache.org> on 2021/05/04 14:48:15 UTC

[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r625823974



##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE

Review comment:
       Exportable variables and functions require comments.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {

Review comment:
       You can use Response class name, becase if use like codec.Response.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte

Review comment:
       You can rename GetBuffer, becase this interface is Response.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {

Review comment:
       Can merge if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken {}

##########
File path: tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
##########
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package multiplexing defines the multiplex connection pool for sending
+// request and receiving response. After receiving the response, the decoded
+// response will be returned to the client. It is used for the communication
+// with TubeMQ.
+package multiplexing
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var (
+	// ErrConnClosed indicates the connection has been closed
+	ErrConnClosed = errors.New("connection has been closed")
+	// ErrChanClosed indicates the recv chan has been closed
+	ErrChanClosed = errors.New("unexpected recv chan closing")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnection indicates connection assertion error
+	ErrAssertConnection = errors.New("connection assertion error")
+)
+
+// The state of the connection.
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+const queueSize = 10000
+
+// Pool maintains the multiplex connections of different addresses.
+type Pool struct {
+	connections *sync.Map
+}
+
+// NewPool will construct a default multiplex connections pool.
+func NewPool() *Pool {
+	m := &Pool{
+		connections: new(sync.Map),
+	}
+	return m
+}
+
+// Get will return a multiplex connection
+// 1. If no underlying TCP connection has been created, a TCP connection will be created first.
+// 2. A new multiplex connection with the serialNo will be created and returned.
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
+	if v, ok := p.connections.Load(address); ok {
+		if c, ok := v.(*Connection); ok {
+			return c.new(ctx, serialNo)
+		}
+		return nil, ErrAssertConnection
+	}
+
+	c := &Connection{
+		address:     address,
+		connections: make(map[uint32]*MultiplexConnection),
+		done:        make(chan struct{}),
+		mDone:       make(chan struct{}),
+		state:       Initial,
+	}
+	c.buffer = &writerBuffer{
+		buffer: make(chan []byte, queueSize),
+		done:   c.done,
+	}
+	p.connections.Store(address, c)
+
+	conn, dialOpts, err := dial(ctx, address, opts)
+	c.dialOpts = dialOpts
+	if err != nil {
+		return nil, err
+	}
+	c.decoder = codec.New(conn)
+	c.conn = conn
+	c.state = Connected
+	c.pool = p
+	go c.reader()
+	go c.writer()
+	return c.new(ctx, serialNo)
+}
+
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
+	var timeout time.Duration
+	t, ok := ctx.Deadline()
+	if ok {
+		timeout = t.Sub(time.Now())
+	}
+	opts.Timeout = timeout
+	select {
+	case <-ctx.Done():
+		return nil, opts, ctx.Err()
+	default:
+	}
+	conn, err := dialWithTimeout(opts)
+	return conn, opts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+	if len(opts.CACertFile) == 0 {
+		return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+	}
+
+	tlsConf := &tls.Config{}
+	if opts.CACertFile == "none" {
+		tlsConf.InsecureSkipVerify = true
+	} else {
+		if len(opts.TLSServerName) == 0 {
+			opts.TLSServerName = opts.Address
+		}
+		tlsConf.ServerName = opts.TLSServerName
+		certPool, err := getCertPool(opts.CACertFile)
+		if err != nil {
+			return nil, err
+		}
+
+		tlsConf.RootCAs = certPool
+
+		if len(opts.TLSCertFile) != 0 {
+			cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
+			if err != nil {
+				return nil, err
+			}
+			tlsConf.Certificates = []tls.Certificate{cert}
+		}
+	}
+	return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+	if caCertFile != "root" {
+		ca, err := ioutil.ReadFile(caCertFile)
+		if err != nil {
+			return nil, err
+		}
+		certPool := x509.NewCertPool()
+		ok := certPool.AppendCertsFromPEM(ca)
+		if !ok {
+			return nil, err
+		}
+		return certPool, nil
+	}
+	return nil, nil
+}
+
+type recvReader struct {
+	ctx  context.Context
+	recv chan codec.TransportResponse
+}
+
+// MultiplexConnection is used to multiplex a TCP connection.
+// It is distinguished by the serialNo.
+type MultiplexConnection struct {
+	serialNo uint32
+	conn     *Connection
+	reader   *recvReader
+	done     chan struct{}
+}
+
+// Write uses the underlying TCP connection to send the bytes.
+func (mc *MultiplexConnection) Write(b []byte) error {
+	if err := mc.conn.send(b); err != nil {
+		mc.conn.remove(mc.serialNo)
+		return err
+	}
+	return nil
+}
+
+// Read returns the response from the multiplex connection.
+func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+	select {
+	case <-mc.reader.ctx.Done():
+		mc.conn.remove(mc.serialNo)
+		return nil, mc.reader.ctx.Err()
+	case v, ok := <-mc.reader.recv:
+		if ok {
+			return v, nil
+		}
+		if mc.conn.err != nil {
+			return nil, mc.conn.err
+		}
+		return nil, ErrChanClosed
+	case <-mc.done:
+		return nil, mc.conn.err
+	}
+}
+
+func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+	mc.reader.recv <- rsp
+	mc.conn.remove(rsp.GetSerialNo())
+}
+
+// DialOptions represents the dail options of the TCP connection.
+// If TLS is not enabled, the configuration of TLS can be ignored.
+type DialOptions struct {
+	Network       string
+	Address       string
+	Timeout       time.Duration
+	CACertFile    string
+	TLSCertFile   string
+	TLSKeyFile    string
+	TLSServerName string
+}
+
+// Connection represents the underlying TCP connection of the multiplex connections of an address
+// and maintains the multiplex connections.
+type Connection struct {
+	err         error
+	address     string
+	mu          sync.RWMutex
+	connections map[uint32]*MultiplexConnection
+	decoder     codec.Decoder
+	conn        net.Conn
+	done        chan struct{}
+	mDone       chan struct{}
+	buffer      *writerBuffer
+	dialOpts    *DialOptions
+	state       int
+	pool        *Pool
+}
+
+func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnection, error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.err != nil {
+		return nil, c.err
+	}
+
+	mc := &MultiplexConnection{
+		serialNo: serialNo,
+		conn:     c,
+		done:     c.mDone,
+		reader: &recvReader{
+			ctx:  ctx,
+			recv: make(chan codec.TransportResponse, 1),
+		},
+	}
+
+	if lastConn, ok := c.connections[serialNo]; ok {
+		close(lastConn.reader.recv)
+	}
+	c.connections[serialNo] = mc
+	return mc, nil
+}
+
+func (c *Connection) close(lastErr error, done chan struct{}) {
+	if lastErr == nil {
+		return
+	}
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.state == Closed {
+		return
+	}
+
+	select {
+	case <-done:
+		return
+	default:
+	}
+
+	c.state = Closing
+	c.err = lastErr
+	c.connections = make(map[uint32]*MultiplexConnection)
+	close(c.done)
+	if c.conn != nil {
+		c.conn.Close()
+	}
+	err := c.reconnect()
+	if err != nil {
+		c.state = Closed
+		close(c.mDone)
+		c.pool.connections.Delete(c)
+	}
+}
+
+func (c *Connection) reconnect() error {
+	conn, err := dialWithTimeout(c.dialOpts)
+	if err != nil {
+		return err
+	}
+	c.done = make(chan struct{})
+	c.conn = conn
+	c.decoder = codec.New(conn)
+	c.buffer.done = c.done
+	c.state = Connected
+	c.err = nil
+	go c.reader()
+	go c.writer()
+	return nil
+}
+
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the serialNo.
+func (c *Connection) reader() {
+	var lastErr error
+	for {
+		select {
+		case <-c.done:
+			return
+		default:
+		}
+		rsp, err := c.decoder.Decode()
+		if err != nil {
+			lastErr = err
+			break
+		}
+		serialNo := rsp.GetSerialNo()
+		c.mu.RLock()
+		mc, ok := c.connections[serialNo]
+		c.mu.RUnlock()
+		if !ok {
+			continue
+		}
+		mc.reader.recv <- rsp
+		mc.conn.remove(rsp.GetSerialNo())

Review comment:
       Can use like this, mc.conn.remove(serialNo).

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	size := make([]byte, 4)
+	for i := 0; i < int(listSize); i++ {
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, totalLen+s)
+			copy(t.msg, data[:])
+		}
+
+		num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])

Review comment:
       Can use if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
      return err
   }

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {

Review comment:
       TubeMQDecoder need other file to writer.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	size := make([]byte, 4)
+	for i := 0; i < int(listSize); i++ {
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, totalLen+s)

Review comment:
       Perhaps more space is needed in the future, and a multiple growth plan should be used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org