You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/10 10:31:53 UTC

[incubator-inlong] branch INLONG-25 updated (3249de3 -> 2cd11c3)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a change to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git.


    from 3249de3  [TUBEMQ-594] Trpc-go tube sdk strongly rely on local config(addendum)
     new 108fd1f  [INLONG-600]Multiplexed connection pool for Go sdk
     new 0033c62  Add license and remove the chinese comment
     new 058ad95  Address review comments
     new 96b5728  nitpick
     new c6d3f75  Rename and add some comments
     new e0fd869  Remove go.sum from git index
     new a1c86bc  nitpick
     new 459405a  nitpick for codec and dialopts
     new 6b64b52  Address review comments
     new e00a5ba  Add missing file
     new a16216d  Address review comments
     new 2cd11c3  nitpick

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tubemq-client-go/codec/codec.go                |  31 +-
 .../tubemq-client-go/codec/tubemq_codec.go         | 132 +++++++
 .../tubemq-client-go/go.mod                        |  24 +-
 .../tubemq-client-go/multiplexing/multiplexing.go  | 419 +++++++++++++++++++++
 .../multiplexing/multlplexing_test.go              | 144 +++++++
 5 files changed, 719 insertions(+), 31 deletions(-)
 copy tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageListener.java => tubemq-client-twins/tubemq-client-go/codec/codec.go (56%)
 create mode 100644 tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
 copy codestyle/suppressions.xml => tubemq-client-twins/tubemq-client-go/go.mod (54%)
 create mode 100644 tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
 create mode 100644 tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go

[incubator-inlong] 01/12: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 108fd1fbefedc73fd38aaf699a0cd1e33f3bb03a
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 14:37:41 2021 +0800

    [INLONG-600]Multiplexed connection pool for Go sdk
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/codec.go                | 107 ++++++
 tubemq-client-twins/tubemq-client-go/go.mod        |   5 +
 .../tubemq-client-go/pool/multiplexed.go           | 386 +++++++++++++++++++++
 .../tubemq-client-go/pool/multlplexed_test.go      | 119 +++++++
 4 files changed, 617 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
new file mode 100644
index 0000000..ee27e96
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -0,0 +1,107 @@
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 8
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+type Framer struct {
+	reader io.Reader
+	msg    []byte
+}
+
+func New(reader io.Reader) *Framer {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &Framer{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+func (f *Framer) Decode() (*FrameResponse, error) {
+	num, err := io.ReadFull(f.reader, f.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(f.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	num, err = io.ReadFull(f.reader, f.msg[frameHeadLen:frameHeadLen+listSizeLen])
+	if num != int(listSizeLen) {
+		return nil, errors.New("framer: read invalid list size num")
+	}
+	listSize := binary.BigEndian.Uint32(f.msg[frameHeadLen : frameHeadLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	size := make([]byte, 4)
+	for i := 0; i < int(listSize); i++ {
+		n, err := io.ReadFull(f.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(f.msg) {
+			data := f.msg[:totalLen]
+			f.msg = make([]byte, totalLen+s)
+			copy(f.msg, data[:])
+		}
+
+		num, err = io.ReadFull(f.reader, f.msg[totalLen:totalLen+s])
+		if err != nil {
+			return nil, err
+		}
+		if num != s {
+			return nil, errors.New("framer: read invalid data")
+		}
+		totalLen += s
+	}
+
+	data := make([]byte, totalLen - int(frameHeadLen))
+	copy(data, f.msg[frameHeadLen:totalLen])
+
+	return &FrameResponse{
+		serialNo:    binary.BigEndian.Uint32(f.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+		responseBuf: data,
+	}, nil
+}
+
+type FrameRequest struct {
+	requestID uint32
+	req       []byte
+}
+
+type FrameResponse struct {
+	serialNo    uint32
+	responseBuf []byte
+}
+
+func (f *FrameResponse) GetSerialNo() uint32 {
+	return f.serialNo
+}
+
+func (f *FrameResponse) GetResponseBuf() []byte {
+	return f.responseBuf
+}
+
+type Codec struct{}
diff --git a/tubemq-client-twins/tubemq-client-go/go.mod b/tubemq-client-twins/tubemq-client-go/go.mod
new file mode 100644
index 0000000..7c1a676
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/go.mod
@@ -0,0 +1,5 @@
+module github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go
+
+go 1.14
+
+require github.com/stretchr/testify v1.7.0
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
new file mode 100644
index 0000000..5d38a14
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
@@ -0,0 +1,386 @@
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {
+	m := &Multiplexed{
+		connections: new(sync.Map),
+	}
+	return m
+}
+
+type writerBuffer struct {
+	buffer chan []byte
+	done   <-chan struct{}
+}
+
+func (w *writerBuffer) get() ([]byte, error) {
+	select {
+	case req := <-w.buffer:
+		return req, nil
+	case <-w.done:
+		return nil, ErrWriteBufferDone
+	}
+}
+
+type recvReader struct {
+	ctx  context.Context
+	recv chan *codec.FrameResponse
+}
+
+type MultiplexedConnection struct {
+	serialNo uint32
+	conn     *Connection
+	reader   *recvReader
+	done     chan struct{}
+}
+
+func (mc *MultiplexedConnection) Write(b []byte) error {
+	if err := mc.conn.send(b); err != nil {
+		mc.conn.remove(mc.serialNo)
+		return err
+	}
+	return nil
+}
+
+func (mc *MultiplexedConnection) Read() (*codec.FrameResponse, error) {
+	select {
+	case <-mc.reader.ctx.Done():
+		mc.conn.remove(mc.serialNo)
+		return nil, mc.reader.ctx.Err()
+	case v, ok := <-mc.reader.recv:
+		if ok {
+			return v, nil
+		}
+		if mc.conn.err != nil {
+			return nil, mc.conn.err
+		}
+		return nil, ErrChanClose
+	case <-mc.done:
+		return nil, mc.conn.err
+	}
+}
+
+func (mc *MultiplexedConnection) recv(rsp *codec.FrameResponse) {
+	mc.reader.recv <- rsp
+	mc.conn.remove(rsp.GetSerialNo())
+}
+
+type DialOptions struct {
+	Network       string
+	Address       string
+	Timeout       time.Duration
+	CACertFile    string
+	TLSCertFile   string
+	TLSKeyFile    string
+	TLSServerName string
+}
+
+type Connection struct {
+	err         error
+	address     string
+	mu          sync.RWMutex
+	connections map[uint32]*MultiplexedConnection
+	framer      *codec.Framer
+	conn        net.Conn
+	done        chan struct{}
+	mDone       chan struct{}
+	buffer      *writerBuffer
+	dialOpts    *DialOptions
+	state       int
+	multiplexed *Multiplexed
+}
+
+func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.err != nil {
+		return nil, c.err
+	}
+
+	vc := &MultiplexedConnection{
+		serialNo: serialNo,
+		conn:     c,
+		done:     c.mDone,
+		reader: &recvReader{
+			ctx:  ctx,
+			recv: make(chan *codec.FrameResponse, 1),
+		},
+	}
+
+	if prevConn, ok := c.connections[serialNo]; ok {
+		close(prevConn.reader.recv)
+	}
+	c.connections[serialNo] = vc
+	return vc, nil
+}
+
+func (c *Connection) close(lastErr error, done chan struct{}) {
+	if lastErr == nil {
+		return
+	}
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.state == Closed {
+		return
+	}
+
+	select {
+	case <-done:
+		return
+	default:
+	}
+
+	c.state = Closing
+	c.err = lastErr
+	c.connections = make(map[uint32]*MultiplexedConnection)
+	close(c.done)
+	if c.conn != nil {
+		c.conn.Close()
+	}
+	err := c.reconnect()
+	if err != nil {
+		c.state = Closed
+		close(c.mDone)
+		c.multiplexed.connections.Delete(c)
+	}
+}
+
+func (c *Connection) reconnect() error {
+	conn, err := dialWithTimeout(c.dialOpts)
+	if err != nil {
+		return err
+	}
+	c.done = make(chan struct{})
+	c.conn = conn
+	c.framer = codec.New(conn)
+	c.buffer.done = c.done
+	c.state = Connected
+	c.err = nil
+	go c.reader()
+	go c.writer()
+	return nil
+}
+
+func (c *Connection) writer() {
+	var lastErr error
+	for {
+		select {
+		case <-c.done:
+			return
+		default:
+		}
+		req, err := c.buffer.get()
+		if err != nil {
+			lastErr = err
+			break
+		}
+		if err := c.write(req); err != nil {
+			lastErr = err
+			break
+		}
+	}
+	c.close(lastErr, c.done)
+}
+
+func (c *Connection) send(b []byte) error {
+	if c.state == Closed {
+		return ErrConnClosed
+	}
+
+	select {
+	case c.buffer.buffer <- b:
+		return nil
+	case <-c.mDone:
+		return c.err
+	}
+}
+
+func (c *Connection) remove(id uint32) {
+	c.mu.Lock()
+	delete(c.connections, id)
+	c.mu.Unlock()
+}
+
+func (c *Connection) write(b []byte) error {
+	sent := 0
+	for sent < len(b) {
+		n, err := c.conn.Write(b[sent:])
+		if err != nil {
+			return err
+		}
+		sent += n
+	}
+	return nil
+}
+
+func (c *Connection) reader() {
+	var lastErr error
+	for {
+		select {
+		case <-c.done:
+			return
+		default:
+		}
+		rsp, err := c.framer.Decode()
+		if err != nil {
+			lastErr = err
+			break
+		}
+		serialNo := rsp.GetSerialNo()
+		c.mu.RLock()
+		mc, ok := c.connections[serialNo]
+		c.mu.RUnlock()
+		if !ok {
+			continue
+		}
+		mc.reader.recv <- rsp
+		mc.conn.remove(rsp.GetSerialNo())
+	}
+	c.close(lastErr, c.done)
+}
+
+type Multiplexed struct {
+	connections *sync.Map
+}
+
+func (p *Multiplexed) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
+	if v, ok := p.connections.Load(address); ok {
+		if c, ok := v.(*Connection); ok {
+			return c.new(ctx, serialNo)
+		}
+		return nil, ErrAssertConnectionFail
+	}
+
+	c := &Connection{
+		address:     address,
+		connections: make(map[uint32]*MultiplexedConnection),
+		done:        make(chan struct{}),
+		mDone:       make(chan struct{}),
+		state:       Initial,
+	}
+	c.buffer = &writerBuffer{
+		buffer: make(chan []byte, queueSize),
+		done:   c.done,
+	}
+	p.connections.Store(address, c)
+
+	conn, dialOpts, err := dial(ctx, address)
+	c.dialOpts = dialOpts
+	if err != nil {
+		return nil, err
+	}
+	c.framer = codec.New(conn)
+	c.conn = conn
+	c.state = Connected
+	go c.reader()
+	go c.writer()
+	return c.new(ctx, serialNo)
+}
+
+func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+	var timeout time.Duration
+	t, ok := ctx.Deadline()
+	if ok {
+		timeout = t.Sub(time.Now())
+	}
+	dialOpts := &DialOptions{
+		Network: "tcp",
+		Address: address,
+		Timeout: timeout,
+	}
+	select {
+	case <-ctx.Done():
+		return nil, dialOpts, ctx.Err()
+	default:
+	}
+	conn, err := dialWithTimeout(dialOpts)
+	return conn, dialOpts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+	if len(opts.CACertFile) == 0 {
+		return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+	}
+
+	tlsConf := &tls.Config{}
+	if opts.CACertFile == "none" { // 不需要检验服务证书
+		tlsConf.InsecureSkipVerify = true
+	} else {
+		if len(opts.TLSServerName) == 0 {
+			opts.TLSServerName = opts.Address
+		}
+		tlsConf.ServerName = opts.TLSServerName
+		certPool, err := getCertPool(opts.CACertFile)
+		if err != nil {
+			return nil, err
+		}
+
+		tlsConf.RootCAs = certPool
+
+		if len(opts.TLSCertFile) != 0 {
+			cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
+			if err != nil {
+				return nil, err
+			}
+			tlsConf.Certificates = []tls.Certificate{cert}
+		}
+	}
+	return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+	if caCertFile != "root" {
+		ca, err := ioutil.ReadFile(caCertFile)
+		if err != nil {
+			return nil, err
+		}
+		certPool := x509.NewCertPool()
+		ok := certPool.AppendCertsFromPEM(ca)
+		if !ok {
+			return nil, err
+		}
+		return certPool, nil
+	}
+	return nil, nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
new file mode 100644
index 0000000..6377032
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
@@ -0,0 +1,119 @@
+package pool
+
+import (
+	"bytes"
+	"context"
+	"encoding/binary"
+	"io"
+	"log"
+	"net"
+	"strconv"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var (
+	address         = "127.0.0.1:0"
+	ch              = make(chan struct{})
+	serialNo uint32 = 1
+)
+
+func init() {
+	go simpleForwardTCPServer(ch)
+	<-ch
+}
+
+func simpleForwardTCPServer(ch chan struct{}) {
+	l, err := net.Listen("tcp", address)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer l.Close()
+	address = l.Addr().String()
+
+	ch <- struct{}{}
+
+	for {
+		conn, err := l.Accept()
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		go func() {
+			io.Copy(conn, conn)
+		}()
+	}
+}
+
+func Encode(serialNo uint32, body []byte) ([]byte, error) {
+	l := len(body)
+	buf := bytes.NewBuffer(make([]byte, 0, 16+l))
+	if err := binary.Write(buf, binary.BigEndian, codec.RPCProtocolBeginToken); err != nil {
+		return nil, err
+	}
+	if err := binary.Write(buf, binary.BigEndian, serialNo); err != nil {
+		return nil, err
+	}
+	if err := binary.Write(buf, binary.BigEndian, uint32(1)); err != nil {
+		return nil, err
+	}
+	if err := binary.Write(buf, binary.BigEndian, uint32(len(body))); err != nil {
+		return nil, err
+	}
+	buf.Write(body)
+	return buf.Bytes(), nil
+}
+
+func TestBasicMultiplexed(t *testing.T) {
+	serialNo := atomic.AddUint32(&serialNo, 1)
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	defer cancel()
+
+	m := New()
+	mc, err := m.Get(ctx, address, serialNo)
+	body := []byte("hello world")
+
+	buf, err := Encode(serialNo, body)
+	assert.Nil(t, err)
+	assert.Nil(t, mc.Write(buf))
+
+	rsp, err := mc.Read()
+	assert.Nil(t, err)
+	assert.Equal(t, serialNo, rsp.GetSerialNo())
+	assert.Equal(t, body, rsp.GetResponseBuf())
+	assert.Equal(t, mc.Write(nil), nil)
+}
+
+func TestConcurrentMultiplexed(t *testing.T) {
+	count := 1000
+	m := New()
+	wg := sync.WaitGroup{}
+	wg.Add(count)
+	for i := 0; i < count; i++ {
+		go func(i int) {
+			defer wg.Done()
+			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+			defer cancel()
+			serialNo := atomic.AddUint32(&serialNo, 1)
+			mc, err := m.Get(ctx, address, serialNo)
+			assert.Nil(t, err)
+
+			body := []byte("hello world" + strconv.Itoa(i))
+			buf, err := Encode(serialNo, body)
+			assert.Nil(t, err)
+			assert.Nil(t, mc.Write(buf))
+
+			rsp, err := mc.Read()
+			assert.Nil(t, err)
+			assert.Equal(t, serialNo, rsp.GetSerialNo())
+			assert.Equal(t, body, rsp.GetResponseBuf())
+		}(i)
+	}
+	wg.Wait()
+}

[incubator-inlong] 11/12: Address review comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a16216d6eebb16cecaafc9d96d03ca58919ca611
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri May 7 10:43:11 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go | 10 ++--------
 1 file changed, 2 insertions(+), 8 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index 96ebfa9..a8706da 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -25,6 +25,7 @@ import (
 	"encoding/binary"
 	"errors"
 	"io"
+	"math"
 )
 
 const (
@@ -86,7 +87,7 @@ func (t *TubeMQDecoder) Decode() (Response, error) {
 		s := int(binary.BigEndian.Uint32(size))
 		if totalLen+s > len(t.msg) {
 			data := t.msg[:totalLen]
-			t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s))
+			t.msg = make([]byte, 0, int(math.Max(float64(2*len(t.msg)), float64(totalLen+s))))
 			copy(t.msg, data[:])
 		}
 
@@ -129,10 +130,3 @@ func (t TubeMQResponse) GetSerialNo() uint32 {
 func (t TubeMQResponse) GetBuffer() []byte {
 	return t.Buffer
 }
-
-func max(x, y int) int {
-	if x < y {
-		return y
-	}
-	return x
-}

[incubator-inlong] 07/12: nitpick

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit a1c86bccac13bb68d58eebb360b3610fa46eb96b
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat May 1 11:15:35 2021 +0800

    nitpick
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/multiplexing/multiplexing.go  | 216 ++++++++++-----------
 1 file changed, 108 insertions(+), 108 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 06e09d2..d1ee603 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -112,6 +112,73 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 	return c.new(ctx, serialNo)
 }
 
+func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+	var timeout time.Duration
+	t, ok := ctx.Deadline()
+	if ok {
+		timeout = t.Sub(time.Now())
+	}
+	dialOpts := &DialOptions{
+		Network: "tcp",
+		Address: address,
+		Timeout: timeout,
+	}
+	select {
+	case <-ctx.Done():
+		return nil, dialOpts, ctx.Err()
+	default:
+	}
+	conn, err := dialWithTimeout(dialOpts)
+	return conn, dialOpts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+	if len(opts.CACertFile) == 0 {
+		return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+	}
+
+	tlsConf := &tls.Config{}
+	if opts.CACertFile == "none" {
+		tlsConf.InsecureSkipVerify = true
+	} else {
+		if len(opts.TLSServerName) == 0 {
+			opts.TLSServerName = opts.Address
+		}
+		tlsConf.ServerName = opts.TLSServerName
+		certPool, err := getCertPool(opts.CACertFile)
+		if err != nil {
+			return nil, err
+		}
+
+		tlsConf.RootCAs = certPool
+
+		if len(opts.TLSCertFile) != 0 {
+			cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
+			if err != nil {
+				return nil, err
+			}
+			tlsConf.Certificates = []tls.Certificate{cert}
+		}
+	}
+	return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+	if caCertFile != "root" {
+		ca, err := ioutil.ReadFile(caCertFile)
+		if err != nil {
+			return nil, err
+		}
+		certPool := x509.NewCertPool()
+		ok := certPool.AppendCertsFromPEM(ca)
+		if !ok {
+			return nil, err
+		}
+		return certPool, nil
+	}
+	return nil, nil
+}
+
 type recvReader struct {
 	ctx  context.Context
 	recv chan codec.TransportResponse
@@ -260,6 +327,35 @@ func (c *Connection) reconnect() error {
 	return nil
 }
 
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the serialNo.
+func (c *Connection) reader() {
+	var lastErr error
+	for {
+		select {
+		case <-c.done:
+			return
+		default:
+		}
+		rsp, err := c.decoder.Decode()
+		if err != nil {
+			lastErr = err
+			break
+		}
+		serialNo := rsp.GetSerialNo()
+		c.mu.RLock()
+		mc, ok := c.connections[serialNo]
+		c.mu.RUnlock()
+		if !ok {
+			continue
+		}
+		mc.reader.recv <- rsp
+		mc.conn.remove(rsp.GetSerialNo())
+	}
+	c.close(lastErr, c.done)
+}
+
 func (c *Connection) writer() {
 	var lastErr error
 	for {
@@ -281,6 +377,18 @@ func (c *Connection) writer() {
 	c.close(lastErr, c.done)
 }
 
+func (c *Connection) write(b []byte) error {
+	sent := 0
+	for sent < len(b) {
+		n, err := c.conn.Write(b[sent:])
+		if err != nil {
+			return err
+		}
+		sent += n
+	}
+	return nil
+}
+
 func (c *Connection) send(b []byte) error {
 	if c.state == Closed {
 		return ErrConnClosed
@@ -300,47 +408,6 @@ func (c *Connection) remove(id uint32) {
 	c.mu.Unlock()
 }
 
-func (c *Connection) write(b []byte) error {
-	sent := 0
-	for sent < len(b) {
-		n, err := c.conn.Write(b[sent:])
-		if err != nil {
-			return err
-		}
-		sent += n
-	}
-	return nil
-}
-
-// The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
-// 2. Send the response to the corresponding multiplex connection based on the serialNo.
-func (c *Connection) reader() {
-	var lastErr error
-	for {
-		select {
-		case <-c.done:
-			return
-		default:
-		}
-		rsp, err := c.decoder.Decode()
-		if err != nil {
-			lastErr = err
-			break
-		}
-		serialNo := rsp.GetSerialNo()
-		c.mu.RLock()
-		mc, ok := c.connections[serialNo]
-		c.mu.RUnlock()
-		if !ok {
-			continue
-		}
-		mc.reader.recv <- rsp
-		mc.conn.remove(rsp.GetSerialNo())
-	}
-	c.close(lastErr, c.done)
-}
-
 type writerBuffer struct {
 	buffer chan []byte
 	done   <-chan struct{}
@@ -354,70 +421,3 @@ func (w *writerBuffer) get() ([]byte, error) {
 		return nil, ErrWriteBufferDone
 	}
 }
-
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
-	var timeout time.Duration
-	t, ok := ctx.Deadline()
-	if ok {
-		timeout = t.Sub(time.Now())
-	}
-	dialOpts := &DialOptions{
-		Network: "tcp",
-		Address: address,
-		Timeout: timeout,
-	}
-	select {
-	case <-ctx.Done():
-		return nil, dialOpts, ctx.Err()
-	default:
-	}
-	conn, err := dialWithTimeout(dialOpts)
-	return conn, dialOpts, err
-}
-
-func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
-	if len(opts.CACertFile) == 0 {
-		return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
-	}
-
-	tlsConf := &tls.Config{}
-	if opts.CACertFile == "none" {
-		tlsConf.InsecureSkipVerify = true
-	} else {
-		if len(opts.TLSServerName) == 0 {
-			opts.TLSServerName = opts.Address
-		}
-		tlsConf.ServerName = opts.TLSServerName
-		certPool, err := getCertPool(opts.CACertFile)
-		if err != nil {
-			return nil, err
-		}
-
-		tlsConf.RootCAs = certPool
-
-		if len(opts.TLSCertFile) != 0 {
-			cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
-			if err != nil {
-				return nil, err
-			}
-			tlsConf.Certificates = []tls.Certificate{cert}
-		}
-	}
-	return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
-}
-
-func getCertPool(caCertFile string) (*x509.CertPool, error) {
-	if caCertFile != "root" {
-		ca, err := ioutil.ReadFile(caCertFile)
-		if err != nil {
-			return nil, err
-		}
-		certPool := x509.NewCertPool()
-		ok := certPool.AppendCertsFromPEM(ca)
-		if !ok {
-			return nil, err
-		}
-		return certPool, nil
-	}
-	return nil, nil
-}

[incubator-inlong] 04/12: nitpick

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 96b5728c33e7d6e840502456eb5ae750df8f062b
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 17:07:21 2021 +0800

    nitpick
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
index 050b608..3792035 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
@@ -65,7 +65,6 @@ func NewPool() *Pool {
 	return m
 }
 
-
 func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
 	select {
 	case <-ctx.Done():
@@ -185,7 +184,7 @@ type Connection struct {
 	buffer      *writerBuffer
 	dialOpts    *DialOptions
 	state       int
-	multiplexed *Pool
+	pool        *Pool
 }
 
 func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
@@ -240,7 +239,7 @@ func (c *Connection) close(lastErr error, done chan struct{}) {
 	if err != nil {
 		c.state = Closed
 		close(c.mDone)
-		c.multiplexed.connections.Delete(c)
+		c.pool.connections.Delete(c)
 	}
 }
 

[incubator-inlong] 03/12: Address review comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 058ad9504843b49ce2852f420ad890e1aad8b78f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 17:00:52 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/codec.go                |  63 ++++++-----
 .../{pool => multiplexed}/multiplexed.go           | 119 +++++++++++----------
 .../{pool => multiplexed}/multlplexed_test.go      |   6 +-
 3 files changed, 99 insertions(+), 89 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index f66fac9..a6f3fee 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -36,40 +36,49 @@ const (
 	beginTokenLen         uint32 = 4
 )
 
-type Framer struct {
+type TransportResponse interface {
+	GetSerialNo() uint32
+	GetResponseBuf() []byte
+}
+
+type Decoder interface {
+	Decode() (TransportResponse, error)
+}
+
+type TubeMQDecoder struct {
 	reader io.Reader
 	msg    []byte
 }
 
-func New(reader io.Reader) *Framer {
+func New(reader io.Reader) *TubeMQDecoder {
 	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
-	return &Framer{
+	return &TubeMQDecoder{
 		msg:    make([]byte, defaultMsgSize),
 		reader: bufferReader,
 	}
 }
 
-func (f *Framer) Decode() (*FrameResponse, error) {
-	num, err := io.ReadFull(f.reader, f.msg[:frameHeadLen])
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
 	if err != nil {
 		return nil, err
 	}
 	if num != int(frameHeadLen) {
 		return nil, errors.New("framer: read frame header num invalid")
 	}
-	token := binary.BigEndian.Uint32(f.msg[:beginTokenLen])
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
 	if token != RPCProtocolBeginToken {
 		return nil, errors.New("framer: read framer rpc protocol begin token not match")
 	}
-	num, err = io.ReadFull(f.reader, f.msg[frameHeadLen:frameHeadLen+listSizeLen])
+	num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen])
 	if num != int(listSizeLen) {
 		return nil, errors.New("framer: read invalid list size num")
 	}
-	listSize := binary.BigEndian.Uint32(f.msg[frameHeadLen : frameHeadLen+listSizeLen])
+	listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen])
 	totalLen := int(frameHeadLen)
 	size := make([]byte, 4)
 	for i := 0; i < int(listSize); i++ {
-		n, err := io.ReadFull(f.reader, size)
+		n, err := io.ReadFull(t.reader, size)
 		if err != nil {
 			return nil, err
 		}
@@ -78,13 +87,13 @@ func (f *Framer) Decode() (*FrameResponse, error) {
 		}
 
 		s := int(binary.BigEndian.Uint32(size))
-		if totalLen+s > len(f.msg) {
-			data := f.msg[:totalLen]
-			f.msg = make([]byte, totalLen+s)
-			copy(f.msg, data[:])
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, totalLen+s)
+			copy(t.msg, data[:])
 		}
 
-		num, err = io.ReadFull(f.reader, f.msg[totalLen:totalLen+s])
+		num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
 		if err != nil {
 			return nil, err
 		}
@@ -94,31 +103,29 @@ func (f *Framer) Decode() (*FrameResponse, error) {
 		totalLen += s
 	}
 
-	data := make([]byte, totalLen - int(frameHeadLen))
-	copy(data, f.msg[frameHeadLen:totalLen])
+	data := make([]byte, totalLen-int(frameHeadLen))
+	copy(data, t.msg[frameHeadLen:totalLen])
 
-	return &FrameResponse{
-		serialNo:    binary.BigEndian.Uint32(f.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+	return TubeMQResponse{
+		serialNo:    binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]),
 		responseBuf: data,
 	}, nil
 }
 
-type FrameRequest struct {
-	requestID uint32
-	req       []byte
+type TubeMQRequest struct {
+	serialNo uint32
+	req      []byte
 }
 
-type FrameResponse struct {
+type TubeMQResponse struct {
 	serialNo    uint32
 	responseBuf []byte
 }
 
-func (f *FrameResponse) GetSerialNo() uint32 {
-	return f.serialNo
+func (t TubeMQResponse) GetSerialNo() uint32 {
+	return t.serialNo
 }
 
-func (f *FrameResponse) GetResponseBuf() []byte {
-	return f.responseBuf
+func (t TubeMQResponse) GetResponseBuf() []byte {
+	return t.responseBuf
 }
-
-type Codec struct{}
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
similarity index 90%
rename from tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
rename to tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
index d5a60b1..050b608 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
@@ -15,7 +15,11 @@
  * limitations under the License.
  */
 
-package pool
+// Package multiplexed defines the multiplexed connection pool for sending
+// request and receiving response. After receiving the response, it will
+// be decoded and returned to the client. It is used for the underlying communication
+// with TubeMQ.
+package multiplexed
 
 import (
 	"context"
@@ -30,8 +34,6 @@ import (
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 )
 
-var DefaultMultiplexedPool = New()
-
 var (
 	// ErrConnClosed indicates that the connection is closed
 	ErrConnClosed = errors.New("connection is closed")
@@ -50,15 +52,60 @@ const (
 	Closed
 )
 
-var queueSize = 10000
+const queueSize = 10000
+
+type Pool struct {
+	connections *sync.Map
+}
 
-func New() *Multiplexed {
-	m := &Multiplexed{
+func NewPool() *Pool {
+	m := &Pool{
 		connections: new(sync.Map),
 	}
 	return m
 }
 
+
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
+	if v, ok := p.connections.Load(address); ok {
+		if c, ok := v.(*Connection); ok {
+			return c.new(ctx, serialNo)
+		}
+		return nil, ErrAssertConnectionFail
+	}
+
+	c := &Connection{
+		address:     address,
+		connections: make(map[uint32]*MultiplexedConnection),
+		done:        make(chan struct{}),
+		mDone:       make(chan struct{}),
+		state:       Initial,
+	}
+	c.buffer = &writerBuffer{
+		buffer: make(chan []byte, queueSize),
+		done:   c.done,
+	}
+	p.connections.Store(address, c)
+
+	conn, dialOpts, err := dial(ctx, address)
+	c.dialOpts = dialOpts
+	if err != nil {
+		return nil, err
+	}
+	c.decoder = codec.New(conn)
+	c.conn = conn
+	c.state = Connected
+	go c.reader()
+	go c.writer()
+	return c.new(ctx, serialNo)
+}
+
 type writerBuffer struct {
 	buffer chan []byte
 	done   <-chan struct{}
@@ -75,7 +122,7 @@ func (w *writerBuffer) get() ([]byte, error) {
 
 type recvReader struct {
 	ctx  context.Context
-	recv chan *codec.FrameResponse
+	recv chan codec.TransportResponse
 }
 
 type MultiplexedConnection struct {
@@ -93,7 +140,7 @@ func (mc *MultiplexedConnection) Write(b []byte) error {
 	return nil
 }
 
-func (mc *MultiplexedConnection) Read() (*codec.FrameResponse, error) {
+func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
 	select {
 	case <-mc.reader.ctx.Done():
 		mc.conn.remove(mc.serialNo)
@@ -111,7 +158,7 @@ func (mc *MultiplexedConnection) Read() (*codec.FrameResponse, error) {
 	}
 }
 
-func (mc *MultiplexedConnection) recv(rsp *codec.FrameResponse) {
+func (mc *MultiplexedConnection) recv(rsp *codec.TubeMQResponse) {
 	mc.reader.recv <- rsp
 	mc.conn.remove(rsp.GetSerialNo())
 }
@@ -131,14 +178,14 @@ type Connection struct {
 	address     string
 	mu          sync.RWMutex
 	connections map[uint32]*MultiplexedConnection
-	framer      *codec.Framer
+	decoder     codec.Decoder
 	conn        net.Conn
 	done        chan struct{}
 	mDone       chan struct{}
 	buffer      *writerBuffer
 	dialOpts    *DialOptions
 	state       int
-	multiplexed *Multiplexed
+	multiplexed *Pool
 }
 
 func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
@@ -154,7 +201,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConn
 		done:     c.mDone,
 		reader: &recvReader{
 			ctx:  ctx,
-			recv: make(chan *codec.FrameResponse, 1),
+			recv: make(chan codec.TransportResponse, 1),
 		},
 	}
 
@@ -204,7 +251,7 @@ func (c *Connection) reconnect() error {
 	}
 	c.done = make(chan struct{})
 	c.conn = conn
-	c.framer = codec.New(conn)
+	c.decoder = codec.New(conn)
 	c.buffer.done = c.done
 	c.state = Connected
 	c.err = nil
@@ -273,7 +320,7 @@ func (c *Connection) reader() {
 			return
 		default:
 		}
-		rsp, err := c.framer.Decode()
+		rsp, err := c.decoder.Decode()
 		if err != nil {
 			lastErr = err
 			break
@@ -291,50 +338,6 @@ func (c *Connection) reader() {
 	c.close(lastErr, c.done)
 }
 
-type Multiplexed struct {
-	connections *sync.Map
-}
-
-func (p *Multiplexed) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
-	select {
-	case <-ctx.Done():
-		return nil, ctx.Err()
-	default:
-	}
-
-	if v, ok := p.connections.Load(address); ok {
-		if c, ok := v.(*Connection); ok {
-			return c.new(ctx, serialNo)
-		}
-		return nil, ErrAssertConnectionFail
-	}
-
-	c := &Connection{
-		address:     address,
-		connections: make(map[uint32]*MultiplexedConnection),
-		done:        make(chan struct{}),
-		mDone:       make(chan struct{}),
-		state:       Initial,
-	}
-	c.buffer = &writerBuffer{
-		buffer: make(chan []byte, queueSize),
-		done:   c.done,
-	}
-	p.connections.Store(address, c)
-
-	conn, dialOpts, err := dial(ctx, address)
-	c.dialOpts = dialOpts
-	if err != nil {
-		return nil, err
-	}
-	c.framer = codec.New(conn)
-	c.conn = conn
-	c.state = Connected
-	go c.reader()
-	go c.writer()
-	return c.new(ctx, serialNo)
-}
-
 func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
 	var timeout time.Duration
 	t, ok := ctx.Deadline()
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
similarity index 98%
rename from tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
rename to tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
index e136ebd..4584607 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package pool
+package multiplexed
 
 import (
 	"bytes"
@@ -92,7 +92,7 @@ func TestBasicMultiplexed(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
 
-	m := New()
+	m := NewPool()
 	mc, err := m.Get(ctx, address, serialNo)
 	body := []byte("hello world")
 
@@ -109,7 +109,7 @@ func TestBasicMultiplexed(t *testing.T) {
 
 func TestConcurrentMultiplexed(t *testing.T) {
 	count := 1000
-	m := New()
+	m := NewPool()
 	wg := sync.WaitGroup{}
 	wg.Add(count)
 	for i := 0; i < count; i++ {

[incubator-inlong] 08/12: nitpick for codec and dialopts

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 459405a5dfd214f448b05bb796baccfd76081891
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sun May 2 15:00:59 2021 +0800

    nitpick for codec and dialopts
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/codec/codec.go    | 11 ++++-------
 .../tubemq-client-go/multiplexing/multiplexing.go      | 18 +++++++-----------
 .../tubemq-client-go/multiplexing/multlplexing_test.go | 12 ++++++++++--
 3 files changed, 21 insertions(+), 20 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index b9ee1d7..fb5c945 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -30,7 +30,7 @@ import (
 const (
 	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
 	RPCMaxBufferSize      uint32 = 8192
-	frameHeadLen          uint32 = 8
+	frameHeadLen          uint32 = 12
 	maxBufferSize         int    = 128 * 1024
 	defaultMsgSize        int    = 4096
 	dataLen               uint32 = 4
@@ -82,11 +82,8 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
 	if token != RPCProtocolBeginToken {
 		return nil, errors.New("framer: read framer rpc protocol begin token not match")
 	}
-	num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen])
-	if num != int(listSizeLen) {
-		return nil, errors.New("framer: read invalid list size num")
-	}
-	listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen])
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
 	totalLen := int(frameHeadLen)
 	size := make([]byte, 4)
 	for i := 0; i < int(listSize); i++ {
@@ -119,7 +116,7 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
 	copy(data, t.msg[frameHeadLen:totalLen])
 
 	return TubeMQResponse{
-		serialNo:    binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+		serialNo:    serialNo,
 		responseBuf: data,
 	}, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index d1ee603..783c8e8 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -71,7 +71,7 @@ func NewPool() *Pool {
 // Get will return a multiplex connection
 // 1. If no underlying TCP connection has been created, a TCP connection will be created first.
 // 2. A new multiplex connection with the serialNo will be created and returned.
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) {
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) {
 	select {
 	case <-ctx.Done():
 		return nil, ctx.Err()
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 	}
 	p.connections.Store(address, c)
 
-	conn, dialOpts, err := dial(ctx, address)
+	conn, dialOpts, err := dial(ctx, address, opts)
 	c.dialOpts = dialOpts
 	if err != nil {
 		return nil, err
@@ -112,24 +112,20 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 	return c.new(ctx, serialNo)
 }
 
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
 	var timeout time.Duration
 	t, ok := ctx.Deadline()
 	if ok {
 		timeout = t.Sub(time.Now())
 	}
-	dialOpts := &DialOptions{
-		Network: "tcp",
-		Address: address,
-		Timeout: timeout,
-	}
+	opts.Timeout = timeout
 	select {
 	case <-ctx.Done():
-		return nil, dialOpts, ctx.Err()
+		return nil, opts, ctx.Err()
 	default:
 	}
-	conn, err := dialWithTimeout(dialOpts)
-	return conn, dialOpts, err
+	conn, err := dialWithTimeout(opts)
+	return conn, opts, err
 }
 
 func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index af1d416..0c6d6b9 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -93,7 +93,11 @@ func TestBasicMultiplexing(t *testing.T) {
 	defer cancel()
 
 	m := NewPool()
-	mc, err := m.Get(ctx, address, serialNo)
+	opts := &DialOptions{
+		Network: "tcp",
+		Address: address,
+	}
+	mc, err := m.Get(ctx, address, serialNo, opts)
 	body := []byte("hello world")
 
 	buf, err := Encode(serialNo, body)
@@ -118,7 +122,11 @@ func TestConcurrentMultiplexing(t *testing.T) {
 			ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 			defer cancel()
 			serialNo := atomic.AddUint32(&serialNo, 1)
-			mc, err := m.Get(ctx, address, serialNo)
+			opts := &DialOptions{
+				Network: "tcp",
+				Address: address,
+			}
+			mc, err := m.Get(ctx, address, serialNo, opts)
 			assert.Nil(t, err)
 
 			body := []byte("hello world" + strconv.Itoa(i))

[incubator-inlong] 09/12: Address review comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 6b64b52905cba23c4f6b7fe892f69f81b7cc6032
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 6 10:44:51 2021 +0800

    Address review comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/codec.go                | 119 +--------------------
 .../tubemq-client-go/multiplexing/multiplexing.go  |  16 +--
 .../multiplexing/multlplexing_test.go              |   4 +-
 3 files changed, 15 insertions(+), 124 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index fb5c945..4af8b5d 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -20,125 +20,16 @@
 // will need to be changed.
 package codec
 
-import (
-	"bufio"
-	"encoding/binary"
-	"errors"
-	"io"
-)
-
-const (
-	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
-	RPCMaxBufferSize      uint32 = 8192
-	frameHeadLen          uint32 = 12
-	maxBufferSize         int    = 128 * 1024
-	defaultMsgSize        int    = 4096
-	dataLen               uint32 = 4
-	listSizeLen           uint32 = 4
-	serialNoLen           uint32 = 4
-	beginTokenLen         uint32 = 4
-)
-
-// TransportResponse is the abstraction of the transport response.
-type TransportResponse interface {
+// Response is the abstraction of the transport response.
+type Response interface {
 	// GetSerialNo returns the `serialNo` of the corresponding request.
 	GetSerialNo() uint32
-	// GetResponseBuf returns the body of the response.
-	GetResponseBuf() []byte
+	// GetBuffer returns the body of the response.
+	GetBuffer() []byte
 }
 
 // Decoder is the abstraction of the decoder which is used to decode the response.
 type Decoder interface {
 	// Decode will decode the response to frame head and body.
-	Decode() (TransportResponse, error)
-}
-
-// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
-type TubeMQDecoder struct {
-	reader io.Reader
-	msg    []byte
-}
-
-// New will return a default TubeMQDecoder.
-func New(reader io.Reader) *TubeMQDecoder {
-	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
-	return &TubeMQDecoder{
-		msg:    make([]byte, defaultMsgSize),
-		reader: bufferReader,
-	}
-}
-
-// Decode will decode the response from TubeMQ to TransportResponse according to
-// the RPC protocol of TubeMQ.
-func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
-	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
-	if err != nil {
-		return nil, err
-	}
-	if num != int(frameHeadLen) {
-		return nil, errors.New("framer: read frame header num invalid")
-	}
-	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
-	if token != RPCProtocolBeginToken {
-		return nil, errors.New("framer: read framer rpc protocol begin token not match")
-	}
-	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
-	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
-	totalLen := int(frameHeadLen)
-	size := make([]byte, 4)
-	for i := 0; i < int(listSize); i++ {
-		n, err := io.ReadFull(t.reader, size)
-		if err != nil {
-			return nil, err
-		}
-		if n != int(dataLen) {
-			return nil, errors.New("framer: read invalid size")
-		}
-
-		s := int(binary.BigEndian.Uint32(size))
-		if totalLen+s > len(t.msg) {
-			data := t.msg[:totalLen]
-			t.msg = make([]byte, totalLen+s)
-			copy(t.msg, data[:])
-		}
-
-		num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
-		if err != nil {
-			return nil, err
-		}
-		if num != s {
-			return nil, errors.New("framer: read invalid data")
-		}
-		totalLen += s
-	}
-
-	data := make([]byte, totalLen-int(frameHeadLen))
-	copy(data, t.msg[frameHeadLen:totalLen])
-
-	return TubeMQResponse{
-		serialNo:    serialNo,
-		responseBuf: data,
-	}, nil
-}
-
-// TubeMQRequest is the implementation of TubeMQ request.
-type TubeMQRequest struct {
-	serialNo uint32
-	req      []byte
-}
-
-// TubeMQResponse is the TubeMQ implementation of TransportResponse.
-type TubeMQResponse struct {
-	serialNo    uint32
-	responseBuf []byte
-}
-
-// GetSerialNo will return the SerialNo of TubeMQResponse.
-func (t TubeMQResponse) GetSerialNo() uint32 {
-	return t.serialNo
-}
-
-// GetResponseBuf will return the body of TubeMQResponse.
-func (t TubeMQResponse) GetResponseBuf() []byte {
-	return t.responseBuf
+	Decode() (Response, error)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 783c8e8..825636a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
 	}
 	p.connections.Store(address, c)
 
-	conn, dialOpts, err := dial(ctx, address, opts)
+	conn, dialOpts, err := dial(ctx, opts)
 	c.dialOpts = dialOpts
 	if err != nil {
 		return nil, err
@@ -112,7 +112,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
 	return c.new(ctx, serialNo)
 }
 
-func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, opts *DialOptions) (net.Conn, *DialOptions, error) {
 	var timeout time.Duration
 	t, ok := ctx.Deadline()
 	if ok {
@@ -177,7 +177,7 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) {
 
 type recvReader struct {
 	ctx  context.Context
-	recv chan codec.TransportResponse
+	recv chan codec.Response
 }
 
 // MultiplexConnection is used to multiplex a TCP connection.
@@ -199,7 +199,7 @@ func (mc *MultiplexConnection) Write(b []byte) error {
 }
 
 // Read returns the response from the multiplex connection.
-func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+func (mc *MultiplexConnection) Read() (codec.Response, error) {
 	select {
 	case <-mc.reader.ctx.Done():
 		mc.conn.remove(mc.serialNo)
@@ -217,7 +217,7 @@ func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
 	}
 }
 
-func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp codec.Response) {
 	mc.reader.recv <- rsp
 	mc.conn.remove(rsp.GetSerialNo())
 }
@@ -264,7 +264,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnec
 		done:     c.mDone,
 		reader: &recvReader{
 			ctx:  ctx,
-			recv: make(chan codec.TransportResponse, 1),
+			recv: make(chan codec.Response, 1),
 		},
 	}
 
@@ -324,7 +324,7 @@ func (c *Connection) reconnect() error {
 }
 
 // The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
+// 1. Read from the connection and decode it to the Response.
 // 2. Send the response to the corresponding multiplex connection based on the serialNo.
 func (c *Connection) reader() {
 	var lastErr error
@@ -347,7 +347,7 @@ func (c *Connection) reader() {
 			continue
 		}
 		mc.reader.recv <- rsp
-		mc.conn.remove(rsp.GetSerialNo())
+		mc.conn.remove(serialNo)
 	}
 	c.close(lastErr, c.done)
 }
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 0c6d6b9..fbeb8c4 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -107,7 +107,7 @@ func TestBasicMultiplexing(t *testing.T) {
 	rsp, err := mc.Read()
 	assert.Nil(t, err)
 	assert.Equal(t, serialNo, rsp.GetSerialNo())
-	assert.Equal(t, body, rsp.GetResponseBuf())
+	assert.Equal(t, body, rsp.GetBuffer())
 	assert.Equal(t, mc.Write(nil), nil)
 }
 
@@ -137,7 +137,7 @@ func TestConcurrentMultiplexing(t *testing.T) {
 			rsp, err := mc.Read()
 			assert.Nil(t, err)
 			assert.Equal(t, serialNo, rsp.GetSerialNo())
-			assert.Equal(t, body, rsp.GetResponseBuf())
+			assert.Equal(t, body, rsp.GetBuffer())
 		}(i)
 	}
 	wg.Wait()

[incubator-inlong] 10/12: Add missing file

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit e00a5ba1bfaf95360ae0648e140732b3a4229003
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 6 10:45:16 2021 +0800

    Add missing file
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/tubemq_codec.go         | 138 +++++++++++++++++++++
 1 file changed, 138 insertions(+)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
new file mode 100644
index 0000000..96ebfa9
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	// The default begin token of TubeMQ RPC protocol.
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	// The default max buffer size the RPC response.
+	RPCMaxBufferSize uint32 = 8192
+	frameHeadLen     uint32 = 12
+	maxBufferSize    int    = 128 * 1024
+	defaultMsgSize   int    = 4096
+	dataLen          uint32 = 4
+	listSizeLen      uint32 = 4
+	serialNoLen      uint32 = 4
+	beginTokenLen    uint32 = 4
+)
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to Response according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (Response, error) {
+	var num int
+	var err error
+	if num, err = io.ReadFull(t.reader, t.msg[:frameHeadLen]); err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	for i := 0; i < int(listSize); i++ {
+		size := make([]byte, 4)
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s))
+			copy(t.msg, data[:])
+		}
+
+		if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
+			return nil, err
+		}
+		if num != s {
+			return nil, errors.New("framer: read invalid data")
+		}
+		totalLen += s
+	}
+
+	data := make([]byte, totalLen-int(frameHeadLen))
+	copy(data, t.msg[frameHeadLen:totalLen])
+
+	return &TubeMQResponse{
+		serialNo: serialNo,
+		Buffer:   data,
+	}, nil
+}
+
+// TubeMQRequest is the implementation of TubeMQ request.
+type TubeMQRequest struct {
+	serialNo uint32
+	req      []byte
+}
+
+// TubeMQResponse is the TubeMQ implementation of Response.
+type TubeMQResponse struct {
+	serialNo uint32
+	Buffer   []byte
+}
+
+// GetSerialNo will return the SerialNo of Response.
+func (t TubeMQResponse) GetSerialNo() uint32 {
+	return t.serialNo
+}
+
+// GetResponseBuf will return the body of Response.
+func (t TubeMQResponse) GetBuffer() []byte {
+	return t.Buffer
+}
+
+func max(x, y int) int {
+	if x < y {
+		return y
+	}
+	return x
+}

[incubator-inlong] 02/12: Add license and remove the chinese comment

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 0033c625af6c5e2e04805010c73d79615b7f7c2d
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 15:03:51 2021 +0800

    Add license and remove the chinese comment
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/codec/codec.go   | 17 +++++++++++++++++
 tubemq-client-twins/tubemq-client-go/go.mod           | 15 +++++++++++++++
 .../tubemq-client-go/pool/multiplexed.go              | 19 ++++++++++++++++++-
 .../tubemq-client-go/pool/multlplexed_test.go         | 19 ++++++++++++++++++-
 4 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index ee27e96..f66fac9 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package codec
 
 import (
diff --git a/tubemq-client-twins/tubemq-client-go/go.mod b/tubemq-client-twins/tubemq-client-go/go.mod
index 7c1a676..4d0a345 100644
--- a/tubemq-client-twins/tubemq-client-go/go.mod
+++ b/tubemq-client-twins/tubemq-client-go/go.mod
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// <p>
+// http://www.apache.org/licenses/LICENSE-2.0
+// <p>
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
 module github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go
 
 go 1.14
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
index 5d38a14..d5a60b1 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package pool
 
 import (
@@ -344,7 +361,7 @@ func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
 	}
 
 	tlsConf := &tls.Config{}
-	if opts.CACertFile == "none" { // 不需要检验服务证书
+	if opts.CACertFile == "none" {
 		tlsConf.InsecureSkipVerify = true
 	} else {
 		if len(opts.TLSServerName) == 0 {
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
index 6377032..e136ebd 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package pool
 
 import (
@@ -19,7 +36,7 @@ import (
 )
 
 var (
-	address         = "127.0.0.1:0"
+	address         = "127.0.0.1:8888"
 	ch              = make(chan struct{})
 	serialNo uint32 = 1
 )

[incubator-inlong] 12/12: nitpick

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit 2cd11c324dba8f1f8749782bd1f071f6fb62f3c1
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri May 7 20:07:27 2021 +0800

    nitpick
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 825636a..0c4239c 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -134,7 +134,7 @@ func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
 	}
 
 	tlsConf := &tls.Config{}
-	if opts.CACertFile == "none" {
+	if opts.CACertFile == "" {
 		tlsConf.InsecureSkipVerify = true
 	} else {
 		if len(opts.TLSServerName) == 0 {

[incubator-inlong] 06/12: Remove go.sum from git index

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit e0fd869f205e7115eae192ae9d65ef4a4f9ec37b
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat May 1 10:46:45 2021 +0800

    Remove go.sum from git index
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 tubemq-client-twins/tubemq-client-go/go.sum | 11 -----------
 1 file changed, 11 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/go.sum b/tubemq-client-twins/tubemq-client-go/go.sum
deleted file mode 100644
index acb88a4..0000000
--- a/tubemq-client-twins/tubemq-client-go/go.sum
+++ /dev/null
@@ -1,11 +0,0 @@
-github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
-github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
-github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
-github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

[incubator-inlong] 05/12: Rename and add some comments

Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit c6d3f751be782e5f62eb1f46b64366f169be8e38
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat May 1 10:04:27 2021 +0800

    Rename and add some comments
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/codec/codec.go                | 16 ++++
 tubemq-client-twins/tubemq-client-go/go.sum        | 11 +++
 .../multiplexing.go}                               | 98 +++++++++++++---------
 .../multlplexing_test.go}                          |  6 +-
 4 files changed, 88 insertions(+), 43 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index a6f3fee..b9ee1d7 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
 package codec
 
 import (
@@ -36,20 +39,27 @@ const (
 	beginTokenLen         uint32 = 4
 )
 
+// TransportResponse is the abstraction of the transport response.
 type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
 	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
 	GetResponseBuf() []byte
 }
 
+// Decoder is the abstraction of the decoder which is used to decode the response.
 type Decoder interface {
+	// Decode will decode the response to frame head and body.
 	Decode() (TransportResponse, error)
 }
 
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
 type TubeMQDecoder struct {
 	reader io.Reader
 	msg    []byte
 }
 
+// New will return a default TubeMQDecoder.
 func New(reader io.Reader) *TubeMQDecoder {
 	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
 	return &TubeMQDecoder{
@@ -58,6 +68,8 @@ func New(reader io.Reader) *TubeMQDecoder {
 	}
 }
 
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
 func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
 	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
 	if err != nil {
@@ -112,20 +124,24 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
 	}, nil
 }
 
+// TubeMQRequest is the implementation of TubeMQ request.
 type TubeMQRequest struct {
 	serialNo uint32
 	req      []byte
 }
 
+// TubeMQResponse is the TubeMQ implementation of TransportResponse.
 type TubeMQResponse struct {
 	serialNo    uint32
 	responseBuf []byte
 }
 
+// GetSerialNo will return the SerialNo of TubeMQResponse.
 func (t TubeMQResponse) GetSerialNo() uint32 {
 	return t.serialNo
 }
 
+// GetResponseBuf will return the body of TubeMQResponse.
 func (t TubeMQResponse) GetResponseBuf() []byte {
 	return t.responseBuf
 }
diff --git a/tubemq-client-twins/tubemq-client-go/go.sum b/tubemq-client-twins/tubemq-client-go/go.sum
new file mode 100644
index 0000000..acb88a4
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/go.sum
@@ -0,0 +1,11 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
similarity index 75%
rename from tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
rename to tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 3792035..06e09d2 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-// Package multiplexed defines the multiplexed connection pool for sending
-// request and receiving response. After receiving the response, it will
-// be decoded and returned to the client. It is used for the underlying communication
+// Package multiplexing defines the multiplex connection pool for sending
+// request and receiving response. After receiving the response, the decoded
+// response will be returned to the client. It is used for the communication
 // with TubeMQ.
-package multiplexed
+package multiplexing
 
 import (
 	"context"
@@ -35,16 +35,17 @@ import (
 )
 
 var (
-	// ErrConnClosed indicates that the connection is closed
-	ErrConnClosed = errors.New("connection is closed")
-	// ErrChanClose indicates the recv chan is closed
-	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrConnClosed indicates the connection has been closed
+	ErrConnClosed = errors.New("connection has been closed")
+	// ErrChanClosed indicates the recv chan has been closed
+	ErrChanClosed = errors.New("unexpected recv chan closing")
 	// ErrWriteBufferDone indicates write buffer done
 	ErrWriteBufferDone = errors.New("write buffer done")
-	// ErrAssertConnectionFail indicates connection assertion error
-	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+	// ErrAssertConnection indicates connection assertion error
+	ErrAssertConnection = errors.New("connection assertion error")
 )
 
+// The state of the connection.
 const (
 	Initial int = iota
 	Connected
@@ -54,10 +55,12 @@ const (
 
 const queueSize = 10000
 
+// Pool maintains the multiplex connections of different addresses.
 type Pool struct {
 	connections *sync.Map
 }
 
+// NewPool will construct a default multiplex connections pool.
 func NewPool() *Pool {
 	m := &Pool{
 		connections: new(sync.Map),
@@ -65,7 +68,10 @@ func NewPool() *Pool {
 	return m
 }
 
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
+// Get will return a multiplex connection
+// 1. If no underlying TCP connection has been created, a TCP connection will be created first.
+// 2. A new multiplex connection with the serialNo will be created and returned.
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) {
 	select {
 	case <-ctx.Done():
 		return nil, ctx.Err()
@@ -76,12 +82,12 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 		if c, ok := v.(*Connection); ok {
 			return c.new(ctx, serialNo)
 		}
-		return nil, ErrAssertConnectionFail
+		return nil, ErrAssertConnection
 	}
 
 	c := &Connection{
 		address:     address,
-		connections: make(map[uint32]*MultiplexedConnection),
+		connections: make(map[uint32]*MultiplexConnection),
 		done:        make(chan struct{}),
 		mDone:       make(chan struct{}),
 		state:       Initial,
@@ -100,38 +106,28 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
 	c.decoder = codec.New(conn)
 	c.conn = conn
 	c.state = Connected
+	c.pool = p
 	go c.reader()
 	go c.writer()
 	return c.new(ctx, serialNo)
 }
 
-type writerBuffer struct {
-	buffer chan []byte
-	done   <-chan struct{}
-}
-
-func (w *writerBuffer) get() ([]byte, error) {
-	select {
-	case req := <-w.buffer:
-		return req, nil
-	case <-w.done:
-		return nil, ErrWriteBufferDone
-	}
-}
-
 type recvReader struct {
 	ctx  context.Context
 	recv chan codec.TransportResponse
 }
 
-type MultiplexedConnection struct {
+// MultiplexConnection is used to multiplex a TCP connection.
+// It is distinguished by the serialNo.
+type MultiplexConnection struct {
 	serialNo uint32
 	conn     *Connection
 	reader   *recvReader
 	done     chan struct{}
 }
 
-func (mc *MultiplexedConnection) Write(b []byte) error {
+// Write uses the underlying TCP connection to send the bytes.
+func (mc *MultiplexConnection) Write(b []byte) error {
 	if err := mc.conn.send(b); err != nil {
 		mc.conn.remove(mc.serialNo)
 		return err
@@ -139,7 +135,8 @@ func (mc *MultiplexedConnection) Write(b []byte) error {
 	return nil
 }
 
-func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
+// Read returns the response from the multiplex connection.
+func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
 	select {
 	case <-mc.reader.ctx.Done():
 		mc.conn.remove(mc.serialNo)
@@ -151,17 +148,19 @@ func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
 		if mc.conn.err != nil {
 			return nil, mc.conn.err
 		}
-		return nil, ErrChanClose
+		return nil, ErrChanClosed
 	case <-mc.done:
 		return nil, mc.conn.err
 	}
 }
 
-func (mc *MultiplexedConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
 	mc.reader.recv <- rsp
 	mc.conn.remove(rsp.GetSerialNo())
 }
 
+// DialOptions represents the dail options of the TCP connection.
+// If TLS is not enabled, the configuration of TLS can be ignored.
 type DialOptions struct {
 	Network       string
 	Address       string
@@ -172,11 +171,13 @@ type DialOptions struct {
 	TLSServerName string
 }
 
+// Connection represents the underlying TCP connection of the multiplex connections of an address
+// and maintains the multiplex connections.
 type Connection struct {
 	err         error
 	address     string
 	mu          sync.RWMutex
-	connections map[uint32]*MultiplexedConnection
+	connections map[uint32]*MultiplexConnection
 	decoder     codec.Decoder
 	conn        net.Conn
 	done        chan struct{}
@@ -187,14 +188,14 @@ type Connection struct {
 	pool        *Pool
 }
 
-func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
+func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnection, error) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 	if c.err != nil {
 		return nil, c.err
 	}
 
-	vc := &MultiplexedConnection{
+	mc := &MultiplexConnection{
 		serialNo: serialNo,
 		conn:     c,
 		done:     c.mDone,
@@ -204,11 +205,11 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConn
 		},
 	}
 
-	if prevConn, ok := c.connections[serialNo]; ok {
-		close(prevConn.reader.recv)
+	if lastConn, ok := c.connections[serialNo]; ok {
+		close(lastConn.reader.recv)
 	}
-	c.connections[serialNo] = vc
-	return vc, nil
+	c.connections[serialNo] = mc
+	return mc, nil
 }
 
 func (c *Connection) close(lastErr error, done chan struct{}) {
@@ -230,7 +231,7 @@ func (c *Connection) close(lastErr error, done chan struct{}) {
 
 	c.state = Closing
 	c.err = lastErr
-	c.connections = make(map[uint32]*MultiplexedConnection)
+	c.connections = make(map[uint32]*MultiplexConnection)
 	close(c.done)
 	if c.conn != nil {
 		c.conn.Close()
@@ -311,6 +312,9 @@ func (c *Connection) write(b []byte) error {
 	return nil
 }
 
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the serialNo.
 func (c *Connection) reader() {
 	var lastErr error
 	for {
@@ -337,6 +341,20 @@ func (c *Connection) reader() {
 	c.close(lastErr, c.done)
 }
 
+type writerBuffer struct {
+	buffer chan []byte
+	done   <-chan struct{}
+}
+
+func (w *writerBuffer) get() ([]byte, error) {
+	select {
+	case req := <-w.buffer:
+		return req, nil
+	case <-w.done:
+		return nil, ErrWriteBufferDone
+	}
+}
+
 func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
 	var timeout time.Duration
 	t, ok := ctx.Deadline()
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
similarity index 96%
rename from tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
rename to tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 4584607..af1d416 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package multiplexed
+package multiplexing
 
 import (
 	"bytes"
@@ -87,7 +87,7 @@ func Encode(serialNo uint32, body []byte) ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
-func TestBasicMultiplexed(t *testing.T) {
+func TestBasicMultiplexing(t *testing.T) {
 	serialNo := atomic.AddUint32(&serialNo, 1)
 	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 	defer cancel()
@@ -107,7 +107,7 @@ func TestBasicMultiplexed(t *testing.T) {
 	assert.Equal(t, mc.Write(nil), nil)
 }
 
-func TestConcurrentMultiplexed(t *testing.T) {
+func TestConcurrentMultiplexing(t *testing.T) {
 	count := 1000
 	m := NewPool()
 	wg := sync.WaitGroup{}