You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/10 10:31:53 UTC
[incubator-inlong] branch INLONG-25 updated (3249de3 -> 2cd11c3)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a change to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git.
from 3249de3 [TUBEMQ-594] Trpc-go tube sdk strongly rely on local config(addendum)
new 108fd1f [INLONG-600]Multiplexed connection pool for Go sdk
new 0033c62 Add license and remove the chinese comment
new 058ad95 Address review comments
new 96b5728 nitpick
new c6d3f75 Rename and add some comments
new e0fd869 Remove go.sum from git index
new a1c86bc nitpick
new 459405a nitpick for codec and dialopts
new 6b64b52 Address review comments
new e00a5ba Add missing file
new a16216d Address review comments
new 2cd11c3 nitpick
The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../tubemq-client-go/codec/codec.go | 31 +-
.../tubemq-client-go/codec/tubemq_codec.go | 132 +++++++
.../tubemq-client-go/go.mod | 24 +-
.../tubemq-client-go/multiplexing/multiplexing.go | 419 +++++++++++++++++++++
.../multiplexing/multlplexing_test.go | 144 +++++++
5 files changed, 719 insertions(+), 31 deletions(-)
copy tubemq-client/src/main/java/org/apache/tubemq/client/consumer/MessageListener.java => tubemq-client-twins/tubemq-client-go/codec/codec.go (56%)
create mode 100644 tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
copy codestyle/suppressions.xml => tubemq-client-twins/tubemq-client-go/go.mod (54%)
create mode 100644 tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
create mode 100644 tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
[incubator-inlong] 01/12: [INLONG-600]Multiplexed connection pool
for Go sdk
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 108fd1fbefedc73fd38aaf699a0cd1e33f3bb03a
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 14:37:41 2021 +0800
[INLONG-600]Multiplexed connection pool for Go sdk
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/codec.go | 107 ++++++
tubemq-client-twins/tubemq-client-go/go.mod | 5 +
.../tubemq-client-go/pool/multiplexed.go | 386 +++++++++++++++++++++
.../tubemq-client-go/pool/multlplexed_test.go | 119 +++++++
4 files changed, 617 insertions(+)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
new file mode 100644
index 0000000..ee27e96
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -0,0 +1,107 @@
+package codec
+
+import (
+ "bufio"
+ "encoding/binary"
+ "errors"
+ "io"
+)
+
+const (
+ RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+ RPCMaxBufferSize uint32 = 8192
+ frameHeadLen uint32 = 8
+ maxBufferSize int = 128 * 1024
+ defaultMsgSize int = 4096
+ dataLen uint32 = 4
+ listSizeLen uint32 = 4
+ serialNoLen uint32 = 4
+ beginTokenLen uint32 = 4
+)
+
+type Framer struct {
+ reader io.Reader
+ msg []byte
+}
+
+func New(reader io.Reader) *Framer {
+ bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+ return &Framer{
+ msg: make([]byte, defaultMsgSize),
+ reader: bufferReader,
+ }
+}
+
+func (f *Framer) Decode() (*FrameResponse, error) {
+ num, err := io.ReadFull(f.reader, f.msg[:frameHeadLen])
+ if err != nil {
+ return nil, err
+ }
+ if num != int(frameHeadLen) {
+ return nil, errors.New("framer: read frame header num invalid")
+ }
+ token := binary.BigEndian.Uint32(f.msg[:beginTokenLen])
+ if token != RPCProtocolBeginToken {
+ return nil, errors.New("framer: read framer rpc protocol begin token not match")
+ }
+ num, err = io.ReadFull(f.reader, f.msg[frameHeadLen:frameHeadLen+listSizeLen])
+ if num != int(listSizeLen) {
+ return nil, errors.New("framer: read invalid list size num")
+ }
+ listSize := binary.BigEndian.Uint32(f.msg[frameHeadLen : frameHeadLen+listSizeLen])
+ totalLen := int(frameHeadLen)
+ size := make([]byte, 4)
+ for i := 0; i < int(listSize); i++ {
+ n, err := io.ReadFull(f.reader, size)
+ if err != nil {
+ return nil, err
+ }
+ if n != int(dataLen) {
+ return nil, errors.New("framer: read invalid size")
+ }
+
+ s := int(binary.BigEndian.Uint32(size))
+ if totalLen+s > len(f.msg) {
+ data := f.msg[:totalLen]
+ f.msg = make([]byte, totalLen+s)
+ copy(f.msg, data[:])
+ }
+
+ num, err = io.ReadFull(f.reader, f.msg[totalLen:totalLen+s])
+ if err != nil {
+ return nil, err
+ }
+ if num != s {
+ return nil, errors.New("framer: read invalid data")
+ }
+ totalLen += s
+ }
+
+ data := make([]byte, totalLen - int(frameHeadLen))
+ copy(data, f.msg[frameHeadLen:totalLen])
+
+ return &FrameResponse{
+ serialNo: binary.BigEndian.Uint32(f.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+ responseBuf: data,
+ }, nil
+}
+
+type FrameRequest struct {
+ requestID uint32
+ req []byte
+}
+
+type FrameResponse struct {
+ serialNo uint32
+ responseBuf []byte
+}
+
+func (f *FrameResponse) GetSerialNo() uint32 {
+ return f.serialNo
+}
+
+func (f *FrameResponse) GetResponseBuf() []byte {
+ return f.responseBuf
+}
+
+type Codec struct{}
diff --git a/tubemq-client-twins/tubemq-client-go/go.mod b/tubemq-client-twins/tubemq-client-go/go.mod
new file mode 100644
index 0000000..7c1a676
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/go.mod
@@ -0,0 +1,5 @@
+module github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go
+
+go 1.14
+
+require github.com/stretchr/testify v1.7.0
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
new file mode 100644
index 0000000..5d38a14
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
@@ -0,0 +1,386 @@
+package pool
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "errors"
+ "io/ioutil"
+ "net"
+ "sync"
+ "time"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+ // ErrConnClosed indicates that the connection is closed
+ ErrConnClosed = errors.New("connection is closed")
+ // ErrChanClose indicates the recv chan is closed
+ ErrChanClose = errors.New("unexpected recv chan close")
+ // ErrWriteBufferDone indicates write buffer done
+ ErrWriteBufferDone = errors.New("write buffer done")
+ // ErrAssertConnectionFail indicates connection assertion error
+ ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+ Initial int = iota
+ Connected
+ Closing
+ Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {
+ m := &Multiplexed{
+ connections: new(sync.Map),
+ }
+ return m
+}
+
+type writerBuffer struct {
+ buffer chan []byte
+ done <-chan struct{}
+}
+
+func (w *writerBuffer) get() ([]byte, error) {
+ select {
+ case req := <-w.buffer:
+ return req, nil
+ case <-w.done:
+ return nil, ErrWriteBufferDone
+ }
+}
+
+type recvReader struct {
+ ctx context.Context
+ recv chan *codec.FrameResponse
+}
+
+type MultiplexedConnection struct {
+ serialNo uint32
+ conn *Connection
+ reader *recvReader
+ done chan struct{}
+}
+
+func (mc *MultiplexedConnection) Write(b []byte) error {
+ if err := mc.conn.send(b); err != nil {
+ mc.conn.remove(mc.serialNo)
+ return err
+ }
+ return nil
+}
+
+func (mc *MultiplexedConnection) Read() (*codec.FrameResponse, error) {
+ select {
+ case <-mc.reader.ctx.Done():
+ mc.conn.remove(mc.serialNo)
+ return nil, mc.reader.ctx.Err()
+ case v, ok := <-mc.reader.recv:
+ if ok {
+ return v, nil
+ }
+ if mc.conn.err != nil {
+ return nil, mc.conn.err
+ }
+ return nil, ErrChanClose
+ case <-mc.done:
+ return nil, mc.conn.err
+ }
+}
+
+func (mc *MultiplexedConnection) recv(rsp *codec.FrameResponse) {
+ mc.reader.recv <- rsp
+ mc.conn.remove(rsp.GetSerialNo())
+}
+
+type DialOptions struct {
+ Network string
+ Address string
+ Timeout time.Duration
+ CACertFile string
+ TLSCertFile string
+ TLSKeyFile string
+ TLSServerName string
+}
+
+type Connection struct {
+ err error
+ address string
+ mu sync.RWMutex
+ connections map[uint32]*MultiplexedConnection
+ framer *codec.Framer
+ conn net.Conn
+ done chan struct{}
+ mDone chan struct{}
+ buffer *writerBuffer
+ dialOpts *DialOptions
+ state int
+ multiplexed *Multiplexed
+}
+
+func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if c.err != nil {
+ return nil, c.err
+ }
+
+ vc := &MultiplexedConnection{
+ serialNo: serialNo,
+ conn: c,
+ done: c.mDone,
+ reader: &recvReader{
+ ctx: ctx,
+ recv: make(chan *codec.FrameResponse, 1),
+ },
+ }
+
+ if prevConn, ok := c.connections[serialNo]; ok {
+ close(prevConn.reader.recv)
+ }
+ c.connections[serialNo] = vc
+ return vc, nil
+}
+
+func (c *Connection) close(lastErr error, done chan struct{}) {
+ if lastErr == nil {
+ return
+ }
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.state == Closed {
+ return
+ }
+
+ select {
+ case <-done:
+ return
+ default:
+ }
+
+ c.state = Closing
+ c.err = lastErr
+ c.connections = make(map[uint32]*MultiplexedConnection)
+ close(c.done)
+ if c.conn != nil {
+ c.conn.Close()
+ }
+ err := c.reconnect()
+ if err != nil {
+ c.state = Closed
+ close(c.mDone)
+ c.multiplexed.connections.Delete(c)
+ }
+}
+
+func (c *Connection) reconnect() error {
+ conn, err := dialWithTimeout(c.dialOpts)
+ if err != nil {
+ return err
+ }
+ c.done = make(chan struct{})
+ c.conn = conn
+ c.framer = codec.New(conn)
+ c.buffer.done = c.done
+ c.state = Connected
+ c.err = nil
+ go c.reader()
+ go c.writer()
+ return nil
+}
+
+func (c *Connection) writer() {
+ var lastErr error
+ for {
+ select {
+ case <-c.done:
+ return
+ default:
+ }
+ req, err := c.buffer.get()
+ if err != nil {
+ lastErr = err
+ break
+ }
+ if err := c.write(req); err != nil {
+ lastErr = err
+ break
+ }
+ }
+ c.close(lastErr, c.done)
+}
+
+func (c *Connection) send(b []byte) error {
+ if c.state == Closed {
+ return ErrConnClosed
+ }
+
+ select {
+ case c.buffer.buffer <- b:
+ return nil
+ case <-c.mDone:
+ return c.err
+ }
+}
+
+func (c *Connection) remove(id uint32) {
+ c.mu.Lock()
+ delete(c.connections, id)
+ c.mu.Unlock()
+}
+
+func (c *Connection) write(b []byte) error {
+ sent := 0
+ for sent < len(b) {
+ n, err := c.conn.Write(b[sent:])
+ if err != nil {
+ return err
+ }
+ sent += n
+ }
+ return nil
+}
+
+func (c *Connection) reader() {
+ var lastErr error
+ for {
+ select {
+ case <-c.done:
+ return
+ default:
+ }
+ rsp, err := c.framer.Decode()
+ if err != nil {
+ lastErr = err
+ break
+ }
+ serialNo := rsp.GetSerialNo()
+ c.mu.RLock()
+ mc, ok := c.connections[serialNo]
+ c.mu.RUnlock()
+ if !ok {
+ continue
+ }
+ mc.reader.recv <- rsp
+ mc.conn.remove(rsp.GetSerialNo())
+ }
+ c.close(lastErr, c.done)
+}
+
+type Multiplexed struct {
+ connections *sync.Map
+}
+
+func (p *Multiplexed) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ if v, ok := p.connections.Load(address); ok {
+ if c, ok := v.(*Connection); ok {
+ return c.new(ctx, serialNo)
+ }
+ return nil, ErrAssertConnectionFail
+ }
+
+ c := &Connection{
+ address: address,
+ connections: make(map[uint32]*MultiplexedConnection),
+ done: make(chan struct{}),
+ mDone: make(chan struct{}),
+ state: Initial,
+ }
+ c.buffer = &writerBuffer{
+ buffer: make(chan []byte, queueSize),
+ done: c.done,
+ }
+ p.connections.Store(address, c)
+
+ conn, dialOpts, err := dial(ctx, address)
+ c.dialOpts = dialOpts
+ if err != nil {
+ return nil, err
+ }
+ c.framer = codec.New(conn)
+ c.conn = conn
+ c.state = Connected
+ go c.reader()
+ go c.writer()
+ return c.new(ctx, serialNo)
+}
+
+func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+ var timeout time.Duration
+ t, ok := ctx.Deadline()
+ if ok {
+ timeout = t.Sub(time.Now())
+ }
+ dialOpts := &DialOptions{
+ Network: "tcp",
+ Address: address,
+ Timeout: timeout,
+ }
+ select {
+ case <-ctx.Done():
+ return nil, dialOpts, ctx.Err()
+ default:
+ }
+ conn, err := dialWithTimeout(dialOpts)
+ return conn, dialOpts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+ if len(opts.CACertFile) == 0 {
+ return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+ }
+
+ tlsConf := &tls.Config{}
+ if opts.CACertFile == "none" { // 不需要检验服务证书
+ tlsConf.InsecureSkipVerify = true
+ } else {
+ if len(opts.TLSServerName) == 0 {
+ opts.TLSServerName = opts.Address
+ }
+ tlsConf.ServerName = opts.TLSServerName
+ certPool, err := getCertPool(opts.CACertFile)
+ if err != nil {
+ return nil, err
+ }
+
+ tlsConf.RootCAs = certPool
+
+ if len(opts.TLSCertFile) != 0 {
+ cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
+ if err != nil {
+ return nil, err
+ }
+ tlsConf.Certificates = []tls.Certificate{cert}
+ }
+ }
+ return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+ if caCertFile != "root" {
+ ca, err := ioutil.ReadFile(caCertFile)
+ if err != nil {
+ return nil, err
+ }
+ certPool := x509.NewCertPool()
+ ok := certPool.AppendCertsFromPEM(ca)
+ if !ok {
+ return nil, err
+ }
+ return certPool, nil
+ }
+ return nil, nil
+}
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
new file mode 100644
index 0000000..6377032
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
@@ -0,0 +1,119 @@
+package pool
+
+import (
+ "bytes"
+ "context"
+ "encoding/binary"
+ "io"
+ "log"
+ "net"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var (
+ address = "127.0.0.1:0"
+ ch = make(chan struct{})
+ serialNo uint32 = 1
+)
+
+func init() {
+ go simpleForwardTCPServer(ch)
+ <-ch
+}
+
+func simpleForwardTCPServer(ch chan struct{}) {
+ l, err := net.Listen("tcp", address)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer l.Close()
+ address = l.Addr().String()
+
+ ch <- struct{}{}
+
+ for {
+ conn, err := l.Accept()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ go func() {
+ io.Copy(conn, conn)
+ }()
+ }
+}
+
+func Encode(serialNo uint32, body []byte) ([]byte, error) {
+ l := len(body)
+ buf := bytes.NewBuffer(make([]byte, 0, 16+l))
+ if err := binary.Write(buf, binary.BigEndian, codec.RPCProtocolBeginToken); err != nil {
+ return nil, err
+ }
+ if err := binary.Write(buf, binary.BigEndian, serialNo); err != nil {
+ return nil, err
+ }
+ if err := binary.Write(buf, binary.BigEndian, uint32(1)); err != nil {
+ return nil, err
+ }
+ if err := binary.Write(buf, binary.BigEndian, uint32(len(body))); err != nil {
+ return nil, err
+ }
+ buf.Write(body)
+ return buf.Bytes(), nil
+}
+
+func TestBasicMultiplexed(t *testing.T) {
+ serialNo := atomic.AddUint32(&serialNo, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+
+ m := New()
+ mc, err := m.Get(ctx, address, serialNo)
+ body := []byte("hello world")
+
+ buf, err := Encode(serialNo, body)
+ assert.Nil(t, err)
+ assert.Nil(t, mc.Write(buf))
+
+ rsp, err := mc.Read()
+ assert.Nil(t, err)
+ assert.Equal(t, serialNo, rsp.GetSerialNo())
+ assert.Equal(t, body, rsp.GetResponseBuf())
+ assert.Equal(t, mc.Write(nil), nil)
+}
+
+func TestConcurrentMultiplexed(t *testing.T) {
+ count := 1000
+ m := New()
+ wg := sync.WaitGroup{}
+ wg.Add(count)
+ for i := 0; i < count; i++ {
+ go func(i int) {
+ defer wg.Done()
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ defer cancel()
+ serialNo := atomic.AddUint32(&serialNo, 1)
+ mc, err := m.Get(ctx, address, serialNo)
+ assert.Nil(t, err)
+
+ body := []byte("hello world" + strconv.Itoa(i))
+ buf, err := Encode(serialNo, body)
+ assert.Nil(t, err)
+ assert.Nil(t, mc.Write(buf))
+
+ rsp, err := mc.Read()
+ assert.Nil(t, err)
+ assert.Equal(t, serialNo, rsp.GetSerialNo())
+ assert.Equal(t, body, rsp.GetResponseBuf())
+ }(i)
+ }
+ wg.Wait()
+}
[incubator-inlong] 11/12: Address review comments
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit a16216d6eebb16cecaafc9d96d03ca58919ca611
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri May 7 10:43:11 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go | 10 ++--------
1 file changed, 2 insertions(+), 8 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
index 96ebfa9..a8706da 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -25,6 +25,7 @@ import (
"encoding/binary"
"errors"
"io"
+ "math"
)
const (
@@ -86,7 +87,7 @@ func (t *TubeMQDecoder) Decode() (Response, error) {
s := int(binary.BigEndian.Uint32(size))
if totalLen+s > len(t.msg) {
data := t.msg[:totalLen]
- t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s))
+ t.msg = make([]byte, 0, int(math.Max(float64(2*len(t.msg)), float64(totalLen+s))))
copy(t.msg, data[:])
}
@@ -129,10 +130,3 @@ func (t TubeMQResponse) GetSerialNo() uint32 {
func (t TubeMQResponse) GetBuffer() []byte {
return t.Buffer
}
-
-func max(x, y int) int {
- if x < y {
- return y
- }
- return x
-}
[incubator-inlong] 07/12: nitpick
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit a1c86bccac13bb68d58eebb360b3610fa46eb96b
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat May 1 11:15:35 2021 +0800
nitpick
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/multiplexing/multiplexing.go | 216 ++++++++++-----------
1 file changed, 108 insertions(+), 108 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 06e09d2..d1ee603 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -112,6 +112,73 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
return c.new(ctx, serialNo)
}
+func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+ var timeout time.Duration
+ t, ok := ctx.Deadline()
+ if ok {
+ timeout = t.Sub(time.Now())
+ }
+ dialOpts := &DialOptions{
+ Network: "tcp",
+ Address: address,
+ Timeout: timeout,
+ }
+ select {
+ case <-ctx.Done():
+ return nil, dialOpts, ctx.Err()
+ default:
+ }
+ conn, err := dialWithTimeout(dialOpts)
+ return conn, dialOpts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+ if len(opts.CACertFile) == 0 {
+ return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+ }
+
+ tlsConf := &tls.Config{}
+ if opts.CACertFile == "none" {
+ tlsConf.InsecureSkipVerify = true
+ } else {
+ if len(opts.TLSServerName) == 0 {
+ opts.TLSServerName = opts.Address
+ }
+ tlsConf.ServerName = opts.TLSServerName
+ certPool, err := getCertPool(opts.CACertFile)
+ if err != nil {
+ return nil, err
+ }
+
+ tlsConf.RootCAs = certPool
+
+ if len(opts.TLSCertFile) != 0 {
+ cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
+ if err != nil {
+ return nil, err
+ }
+ tlsConf.Certificates = []tls.Certificate{cert}
+ }
+ }
+ return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+ if caCertFile != "root" {
+ ca, err := ioutil.ReadFile(caCertFile)
+ if err != nil {
+ return nil, err
+ }
+ certPool := x509.NewCertPool()
+ ok := certPool.AppendCertsFromPEM(ca)
+ if !ok {
+ return nil, err
+ }
+ return certPool, nil
+ }
+ return nil, nil
+}
+
type recvReader struct {
ctx context.Context
recv chan codec.TransportResponse
@@ -260,6 +327,35 @@ func (c *Connection) reconnect() error {
return nil
}
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the serialNo.
+func (c *Connection) reader() {
+ var lastErr error
+ for {
+ select {
+ case <-c.done:
+ return
+ default:
+ }
+ rsp, err := c.decoder.Decode()
+ if err != nil {
+ lastErr = err
+ break
+ }
+ serialNo := rsp.GetSerialNo()
+ c.mu.RLock()
+ mc, ok := c.connections[serialNo]
+ c.mu.RUnlock()
+ if !ok {
+ continue
+ }
+ mc.reader.recv <- rsp
+ mc.conn.remove(rsp.GetSerialNo())
+ }
+ c.close(lastErr, c.done)
+}
+
func (c *Connection) writer() {
var lastErr error
for {
@@ -281,6 +377,18 @@ func (c *Connection) writer() {
c.close(lastErr, c.done)
}
+func (c *Connection) write(b []byte) error {
+ sent := 0
+ for sent < len(b) {
+ n, err := c.conn.Write(b[sent:])
+ if err != nil {
+ return err
+ }
+ sent += n
+ }
+ return nil
+}
+
func (c *Connection) send(b []byte) error {
if c.state == Closed {
return ErrConnClosed
@@ -300,47 +408,6 @@ func (c *Connection) remove(id uint32) {
c.mu.Unlock()
}
-func (c *Connection) write(b []byte) error {
- sent := 0
- for sent < len(b) {
- n, err := c.conn.Write(b[sent:])
- if err != nil {
- return err
- }
- sent += n
- }
- return nil
-}
-
-// The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
-// 2. Send the response to the corresponding multiplex connection based on the serialNo.
-func (c *Connection) reader() {
- var lastErr error
- for {
- select {
- case <-c.done:
- return
- default:
- }
- rsp, err := c.decoder.Decode()
- if err != nil {
- lastErr = err
- break
- }
- serialNo := rsp.GetSerialNo()
- c.mu.RLock()
- mc, ok := c.connections[serialNo]
- c.mu.RUnlock()
- if !ok {
- continue
- }
- mc.reader.recv <- rsp
- mc.conn.remove(rsp.GetSerialNo())
- }
- c.close(lastErr, c.done)
-}
-
type writerBuffer struct {
buffer chan []byte
done <-chan struct{}
@@ -354,70 +421,3 @@ func (w *writerBuffer) get() ([]byte, error) {
return nil, ErrWriteBufferDone
}
}
-
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
- var timeout time.Duration
- t, ok := ctx.Deadline()
- if ok {
- timeout = t.Sub(time.Now())
- }
- dialOpts := &DialOptions{
- Network: "tcp",
- Address: address,
- Timeout: timeout,
- }
- select {
- case <-ctx.Done():
- return nil, dialOpts, ctx.Err()
- default:
- }
- conn, err := dialWithTimeout(dialOpts)
- return conn, dialOpts, err
-}
-
-func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
- if len(opts.CACertFile) == 0 {
- return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
- }
-
- tlsConf := &tls.Config{}
- if opts.CACertFile == "none" {
- tlsConf.InsecureSkipVerify = true
- } else {
- if len(opts.TLSServerName) == 0 {
- opts.TLSServerName = opts.Address
- }
- tlsConf.ServerName = opts.TLSServerName
- certPool, err := getCertPool(opts.CACertFile)
- if err != nil {
- return nil, err
- }
-
- tlsConf.RootCAs = certPool
-
- if len(opts.TLSCertFile) != 0 {
- cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
- if err != nil {
- return nil, err
- }
- tlsConf.Certificates = []tls.Certificate{cert}
- }
- }
- return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
-}
-
-func getCertPool(caCertFile string) (*x509.CertPool, error) {
- if caCertFile != "root" {
- ca, err := ioutil.ReadFile(caCertFile)
- if err != nil {
- return nil, err
- }
- certPool := x509.NewCertPool()
- ok := certPool.AppendCertsFromPEM(ca)
- if !ok {
- return nil, err
- }
- return certPool, nil
- }
- return nil, nil
-}
[incubator-inlong] 04/12: nitpick
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 96b5728c33e7d6e840502456eb5ae750df8f062b
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 17:07:21 2021 +0800
nitpick
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
index 050b608..3792035 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
@@ -65,7 +65,6 @@ func NewPool() *Pool {
return m
}
-
func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
select {
case <-ctx.Done():
@@ -185,7 +184,7 @@ type Connection struct {
buffer *writerBuffer
dialOpts *DialOptions
state int
- multiplexed *Pool
+ pool *Pool
}
func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
@@ -240,7 +239,7 @@ func (c *Connection) close(lastErr error, done chan struct{}) {
if err != nil {
c.state = Closed
close(c.mDone)
- c.multiplexed.connections.Delete(c)
+ c.pool.connections.Delete(c)
}
}
[incubator-inlong] 03/12: Address review comments
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 058ad9504843b49ce2852f420ad890e1aad8b78f
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 17:00:52 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/codec.go | 63 ++++++-----
.../{pool => multiplexed}/multiplexed.go | 119 +++++++++++----------
.../{pool => multiplexed}/multlplexed_test.go | 6 +-
3 files changed, 99 insertions(+), 89 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index f66fac9..a6f3fee 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -36,40 +36,49 @@ const (
beginTokenLen uint32 = 4
)
-type Framer struct {
+type TransportResponse interface {
+ GetSerialNo() uint32
+ GetResponseBuf() []byte
+}
+
+type Decoder interface {
+ Decode() (TransportResponse, error)
+}
+
+type TubeMQDecoder struct {
reader io.Reader
msg []byte
}
-func New(reader io.Reader) *Framer {
+func New(reader io.Reader) *TubeMQDecoder {
bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
- return &Framer{
+ return &TubeMQDecoder{
msg: make([]byte, defaultMsgSize),
reader: bufferReader,
}
}
-func (f *Framer) Decode() (*FrameResponse, error) {
- num, err := io.ReadFull(f.reader, f.msg[:frameHeadLen])
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+ num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
if err != nil {
return nil, err
}
if num != int(frameHeadLen) {
return nil, errors.New("framer: read frame header num invalid")
}
- token := binary.BigEndian.Uint32(f.msg[:beginTokenLen])
+ token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
if token != RPCProtocolBeginToken {
return nil, errors.New("framer: read framer rpc protocol begin token not match")
}
- num, err = io.ReadFull(f.reader, f.msg[frameHeadLen:frameHeadLen+listSizeLen])
+ num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen])
if num != int(listSizeLen) {
return nil, errors.New("framer: read invalid list size num")
}
- listSize := binary.BigEndian.Uint32(f.msg[frameHeadLen : frameHeadLen+listSizeLen])
+ listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen])
totalLen := int(frameHeadLen)
size := make([]byte, 4)
for i := 0; i < int(listSize); i++ {
- n, err := io.ReadFull(f.reader, size)
+ n, err := io.ReadFull(t.reader, size)
if err != nil {
return nil, err
}
@@ -78,13 +87,13 @@ func (f *Framer) Decode() (*FrameResponse, error) {
}
s := int(binary.BigEndian.Uint32(size))
- if totalLen+s > len(f.msg) {
- data := f.msg[:totalLen]
- f.msg = make([]byte, totalLen+s)
- copy(f.msg, data[:])
+ if totalLen+s > len(t.msg) {
+ data := t.msg[:totalLen]
+ t.msg = make([]byte, totalLen+s)
+ copy(t.msg, data[:])
}
- num, err = io.ReadFull(f.reader, f.msg[totalLen:totalLen+s])
+ num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
if err != nil {
return nil, err
}
@@ -94,31 +103,29 @@ func (f *Framer) Decode() (*FrameResponse, error) {
totalLen += s
}
- data := make([]byte, totalLen - int(frameHeadLen))
- copy(data, f.msg[frameHeadLen:totalLen])
+ data := make([]byte, totalLen-int(frameHeadLen))
+ copy(data, t.msg[frameHeadLen:totalLen])
- return &FrameResponse{
- serialNo: binary.BigEndian.Uint32(f.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+ return TubeMQResponse{
+ serialNo: binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]),
responseBuf: data,
}, nil
}
-type FrameRequest struct {
- requestID uint32
- req []byte
+type TubeMQRequest struct {
+ serialNo uint32
+ req []byte
}
-type FrameResponse struct {
+type TubeMQResponse struct {
serialNo uint32
responseBuf []byte
}
-func (f *FrameResponse) GetSerialNo() uint32 {
- return f.serialNo
+func (t TubeMQResponse) GetSerialNo() uint32 {
+ return t.serialNo
}
-func (f *FrameResponse) GetResponseBuf() []byte {
- return f.responseBuf
+func (t TubeMQResponse) GetResponseBuf() []byte {
+ return t.responseBuf
}
-
-type Codec struct{}
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
similarity index 90%
rename from tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
rename to tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
index d5a60b1..050b608 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
@@ -15,7 +15,11 @@
* limitations under the License.
*/
-package pool
+// Package multiplexed defines the multiplexed connection pool for sending
+// request and receiving response. After receiving the response, it will
+// be decoded and returned to the client. It is used for the underlying communication
+// with TubeMQ.
+package multiplexed
import (
"context"
@@ -30,8 +34,6 @@ import (
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
)
-var DefaultMultiplexedPool = New()
-
var (
// ErrConnClosed indicates that the connection is closed
ErrConnClosed = errors.New("connection is closed")
@@ -50,15 +52,60 @@ const (
Closed
)
-var queueSize = 10000
+const queueSize = 10000
+
+type Pool struct {
+ connections *sync.Map
+}
-func New() *Multiplexed {
- m := &Multiplexed{
+func NewPool() *Pool {
+ m := &Pool{
connections: new(sync.Map),
}
return m
}
+
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ }
+
+ if v, ok := p.connections.Load(address); ok {
+ if c, ok := v.(*Connection); ok {
+ return c.new(ctx, serialNo)
+ }
+ return nil, ErrAssertConnectionFail
+ }
+
+ c := &Connection{
+ address: address,
+ connections: make(map[uint32]*MultiplexedConnection),
+ done: make(chan struct{}),
+ mDone: make(chan struct{}),
+ state: Initial,
+ }
+ c.buffer = &writerBuffer{
+ buffer: make(chan []byte, queueSize),
+ done: c.done,
+ }
+ p.connections.Store(address, c)
+
+ conn, dialOpts, err := dial(ctx, address)
+ c.dialOpts = dialOpts
+ if err != nil {
+ return nil, err
+ }
+ c.decoder = codec.New(conn)
+ c.conn = conn
+ c.state = Connected
+ go c.reader()
+ go c.writer()
+ return c.new(ctx, serialNo)
+}
+
type writerBuffer struct {
buffer chan []byte
done <-chan struct{}
@@ -75,7 +122,7 @@ func (w *writerBuffer) get() ([]byte, error) {
type recvReader struct {
ctx context.Context
- recv chan *codec.FrameResponse
+ recv chan codec.TransportResponse
}
type MultiplexedConnection struct {
@@ -93,7 +140,7 @@ func (mc *MultiplexedConnection) Write(b []byte) error {
return nil
}
-func (mc *MultiplexedConnection) Read() (*codec.FrameResponse, error) {
+func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
select {
case <-mc.reader.ctx.Done():
mc.conn.remove(mc.serialNo)
@@ -111,7 +158,7 @@ func (mc *MultiplexedConnection) Read() (*codec.FrameResponse, error) {
}
}
-func (mc *MultiplexedConnection) recv(rsp *codec.FrameResponse) {
+func (mc *MultiplexedConnection) recv(rsp *codec.TubeMQResponse) {
mc.reader.recv <- rsp
mc.conn.remove(rsp.GetSerialNo())
}
@@ -131,14 +178,14 @@ type Connection struct {
address string
mu sync.RWMutex
connections map[uint32]*MultiplexedConnection
- framer *codec.Framer
+ decoder codec.Decoder
conn net.Conn
done chan struct{}
mDone chan struct{}
buffer *writerBuffer
dialOpts *DialOptions
state int
- multiplexed *Multiplexed
+ multiplexed *Pool
}
func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
@@ -154,7 +201,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConn
done: c.mDone,
reader: &recvReader{
ctx: ctx,
- recv: make(chan *codec.FrameResponse, 1),
+ recv: make(chan codec.TransportResponse, 1),
},
}
@@ -204,7 +251,7 @@ func (c *Connection) reconnect() error {
}
c.done = make(chan struct{})
c.conn = conn
- c.framer = codec.New(conn)
+ c.decoder = codec.New(conn)
c.buffer.done = c.done
c.state = Connected
c.err = nil
@@ -273,7 +320,7 @@ func (c *Connection) reader() {
return
default:
}
- rsp, err := c.framer.Decode()
+ rsp, err := c.decoder.Decode()
if err != nil {
lastErr = err
break
@@ -291,50 +338,6 @@ func (c *Connection) reader() {
c.close(lastErr, c.done)
}
-type Multiplexed struct {
- connections *sync.Map
-}
-
-func (p *Multiplexed) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- default:
- }
-
- if v, ok := p.connections.Load(address); ok {
- if c, ok := v.(*Connection); ok {
- return c.new(ctx, serialNo)
- }
- return nil, ErrAssertConnectionFail
- }
-
- c := &Connection{
- address: address,
- connections: make(map[uint32]*MultiplexedConnection),
- done: make(chan struct{}),
- mDone: make(chan struct{}),
- state: Initial,
- }
- c.buffer = &writerBuffer{
- buffer: make(chan []byte, queueSize),
- done: c.done,
- }
- p.connections.Store(address, c)
-
- conn, dialOpts, err := dial(ctx, address)
- c.dialOpts = dialOpts
- if err != nil {
- return nil, err
- }
- c.framer = codec.New(conn)
- c.conn = conn
- c.state = Connected
- go c.reader()
- go c.writer()
- return c.new(ctx, serialNo)
-}
-
func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
var timeout time.Duration
t, ok := ctx.Deadline()
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
similarity index 98%
rename from tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
rename to tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
index e136ebd..4584607 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package pool
+package multiplexed
import (
"bytes"
@@ -92,7 +92,7 @@ func TestBasicMultiplexed(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
- m := New()
+ m := NewPool()
mc, err := m.Get(ctx, address, serialNo)
body := []byte("hello world")
@@ -109,7 +109,7 @@ func TestBasicMultiplexed(t *testing.T) {
func TestConcurrentMultiplexed(t *testing.T) {
count := 1000
- m := New()
+ m := NewPool()
wg := sync.WaitGroup{}
wg.Add(count)
for i := 0; i < count; i++ {
[incubator-inlong] 08/12: nitpick for codec and dialopts
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 459405a5dfd214f448b05bb796baccfd76081891
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sun May 2 15:00:59 2021 +0800
nitpick for codec and dialopts
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/codec/codec.go | 11 ++++-------
.../tubemq-client-go/multiplexing/multiplexing.go | 18 +++++++-----------
.../tubemq-client-go/multiplexing/multlplexing_test.go | 12 ++++++++++--
3 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index b9ee1d7..fb5c945 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -30,7 +30,7 @@ import (
const (
RPCProtocolBeginToken uint32 = 0xFF7FF4FE
RPCMaxBufferSize uint32 = 8192
- frameHeadLen uint32 = 8
+ frameHeadLen uint32 = 12
maxBufferSize int = 128 * 1024
defaultMsgSize int = 4096
dataLen uint32 = 4
@@ -82,11 +82,8 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
if token != RPCProtocolBeginToken {
return nil, errors.New("framer: read framer rpc protocol begin token not match")
}
- num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen])
- if num != int(listSizeLen) {
- return nil, errors.New("framer: read invalid list size num")
- }
- listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen])
+ serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+ listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
totalLen := int(frameHeadLen)
size := make([]byte, 4)
for i := 0; i < int(listSize); i++ {
@@ -119,7 +116,7 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
copy(data, t.msg[frameHeadLen:totalLen])
return TubeMQResponse{
- serialNo: binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+ serialNo: serialNo,
responseBuf: data,
}, nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index d1ee603..783c8e8 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -71,7 +71,7 @@ func NewPool() *Pool {
// Get will return a multiplex connection
// 1. If no underlying TCP connection has been created, a TCP connection will be created first.
// 2. A new multiplex connection with the serialNo will be created and returned.
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) {
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
}
p.connections.Store(address, c)
- conn, dialOpts, err := dial(ctx, address)
+ conn, dialOpts, err := dial(ctx, address, opts)
c.dialOpts = dialOpts
if err != nil {
return nil, err
@@ -112,24 +112,20 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
return c.new(ctx, serialNo)
}
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
var timeout time.Duration
t, ok := ctx.Deadline()
if ok {
timeout = t.Sub(time.Now())
}
- dialOpts := &DialOptions{
- Network: "tcp",
- Address: address,
- Timeout: timeout,
- }
+ opts.Timeout = timeout
select {
case <-ctx.Done():
- return nil, dialOpts, ctx.Err()
+ return nil, opts, ctx.Err()
default:
}
- conn, err := dialWithTimeout(dialOpts)
- return conn, dialOpts, err
+ conn, err := dialWithTimeout(opts)
+ return conn, opts, err
}
func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index af1d416..0c6d6b9 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -93,7 +93,11 @@ func TestBasicMultiplexing(t *testing.T) {
defer cancel()
m := NewPool()
- mc, err := m.Get(ctx, address, serialNo)
+ opts := &DialOptions{
+ Network: "tcp",
+ Address: address,
+ }
+ mc, err := m.Get(ctx, address, serialNo, opts)
body := []byte("hello world")
buf, err := Encode(serialNo, body)
@@ -118,7 +122,11 @@ func TestConcurrentMultiplexing(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
serialNo := atomic.AddUint32(&serialNo, 1)
- mc, err := m.Get(ctx, address, serialNo)
+ opts := &DialOptions{
+ Network: "tcp",
+ Address: address,
+ }
+ mc, err := m.Get(ctx, address, serialNo, opts)
assert.Nil(t, err)
body := []byte("hello world" + strconv.Itoa(i))
[incubator-inlong] 09/12: Address review comments
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 6b64b52905cba23c4f6b7fe892f69f81b7cc6032
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 6 10:44:51 2021 +0800
Address review comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/codec.go | 119 +--------------------
.../tubemq-client-go/multiplexing/multiplexing.go | 16 +--
.../multiplexing/multlplexing_test.go | 4 +-
3 files changed, 15 insertions(+), 124 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index fb5c945..4af8b5d 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -20,125 +20,16 @@
// will need to be changed.
package codec
-import (
- "bufio"
- "encoding/binary"
- "errors"
- "io"
-)
-
-const (
- RPCProtocolBeginToken uint32 = 0xFF7FF4FE
- RPCMaxBufferSize uint32 = 8192
- frameHeadLen uint32 = 12
- maxBufferSize int = 128 * 1024
- defaultMsgSize int = 4096
- dataLen uint32 = 4
- listSizeLen uint32 = 4
- serialNoLen uint32 = 4
- beginTokenLen uint32 = 4
-)
-
-// TransportResponse is the abstraction of the transport response.
-type TransportResponse interface {
+// Response is the abstraction of the transport response.
+type Response interface {
// GetSerialNo returns the `serialNo` of the corresponding request.
GetSerialNo() uint32
- // GetResponseBuf returns the body of the response.
- GetResponseBuf() []byte
+ // GetBuffer returns the body of the response.
+ GetBuffer() []byte
}
// Decoder is the abstraction of the decoder which is used to decode the response.
type Decoder interface {
// Decode will decode the response to frame head and body.
- Decode() (TransportResponse, error)
-}
-
-// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
-type TubeMQDecoder struct {
- reader io.Reader
- msg []byte
-}
-
-// New will return a default TubeMQDecoder.
-func New(reader io.Reader) *TubeMQDecoder {
- bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
- return &TubeMQDecoder{
- msg: make([]byte, defaultMsgSize),
- reader: bufferReader,
- }
-}
-
-// Decode will decode the response from TubeMQ to TransportResponse according to
-// the RPC protocol of TubeMQ.
-func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
- num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
- if err != nil {
- return nil, err
- }
- if num != int(frameHeadLen) {
- return nil, errors.New("framer: read frame header num invalid")
- }
- token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
- if token != RPCProtocolBeginToken {
- return nil, errors.New("framer: read framer rpc protocol begin token not match")
- }
- serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
- listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
- totalLen := int(frameHeadLen)
- size := make([]byte, 4)
- for i := 0; i < int(listSize); i++ {
- n, err := io.ReadFull(t.reader, size)
- if err != nil {
- return nil, err
- }
- if n != int(dataLen) {
- return nil, errors.New("framer: read invalid size")
- }
-
- s := int(binary.BigEndian.Uint32(size))
- if totalLen+s > len(t.msg) {
- data := t.msg[:totalLen]
- t.msg = make([]byte, totalLen+s)
- copy(t.msg, data[:])
- }
-
- num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])
- if err != nil {
- return nil, err
- }
- if num != s {
- return nil, errors.New("framer: read invalid data")
- }
- totalLen += s
- }
-
- data := make([]byte, totalLen-int(frameHeadLen))
- copy(data, t.msg[frameHeadLen:totalLen])
-
- return TubeMQResponse{
- serialNo: serialNo,
- responseBuf: data,
- }, nil
-}
-
-// TubeMQRequest is the implementation of TubeMQ request.
-type TubeMQRequest struct {
- serialNo uint32
- req []byte
-}
-
-// TubeMQResponse is the TubeMQ implementation of TransportResponse.
-type TubeMQResponse struct {
- serialNo uint32
- responseBuf []byte
-}
-
-// GetSerialNo will return the SerialNo of TubeMQResponse.
-func (t TubeMQResponse) GetSerialNo() uint32 {
- return t.serialNo
-}
-
-// GetResponseBuf will return the body of TubeMQResponse.
-func (t TubeMQResponse) GetResponseBuf() []byte {
- return t.responseBuf
+ Decode() (Response, error)
}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 783c8e8..825636a 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
}
p.connections.Store(address, c)
- conn, dialOpts, err := dial(ctx, address, opts)
+ conn, dialOpts, err := dial(ctx, opts)
c.dialOpts = dialOpts
if err != nil {
return nil, err
@@ -112,7 +112,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *D
return c.new(ctx, serialNo)
}
-func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, opts *DialOptions) (net.Conn, *DialOptions, error) {
var timeout time.Duration
t, ok := ctx.Deadline()
if ok {
@@ -177,7 +177,7 @@ func getCertPool(caCertFile string) (*x509.CertPool, error) {
type recvReader struct {
ctx context.Context
- recv chan codec.TransportResponse
+ recv chan codec.Response
}
// MultiplexConnection is used to multiplex a TCP connection.
@@ -199,7 +199,7 @@ func (mc *MultiplexConnection) Write(b []byte) error {
}
// Read returns the response from the multiplex connection.
-func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+func (mc *MultiplexConnection) Read() (codec.Response, error) {
select {
case <-mc.reader.ctx.Done():
mc.conn.remove(mc.serialNo)
@@ -217,7 +217,7 @@ func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
}
}
-func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp codec.Response) {
mc.reader.recv <- rsp
mc.conn.remove(rsp.GetSerialNo())
}
@@ -264,7 +264,7 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnec
done: c.mDone,
reader: &recvReader{
ctx: ctx,
- recv: make(chan codec.TransportResponse, 1),
+ recv: make(chan codec.Response, 1),
},
}
@@ -324,7 +324,7 @@ func (c *Connection) reconnect() error {
}
// The response handling logic of the TCP connection.
-// 1. Read from the connection and decode it to the TransportResponse.
+// 1. Read from the connection and decode it to the Response.
// 2. Send the response to the corresponding multiplex connection based on the serialNo.
func (c *Connection) reader() {
var lastErr error
@@ -347,7 +347,7 @@ func (c *Connection) reader() {
continue
}
mc.reader.recv <- rsp
- mc.conn.remove(rsp.GetSerialNo())
+ mc.conn.remove(serialNo)
}
c.close(lastErr, c.done)
}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 0c6d6b9..fbeb8c4 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -107,7 +107,7 @@ func TestBasicMultiplexing(t *testing.T) {
rsp, err := mc.Read()
assert.Nil(t, err)
assert.Equal(t, serialNo, rsp.GetSerialNo())
- assert.Equal(t, body, rsp.GetResponseBuf())
+ assert.Equal(t, body, rsp.GetBuffer())
assert.Equal(t, mc.Write(nil), nil)
}
@@ -137,7 +137,7 @@ func TestConcurrentMultiplexing(t *testing.T) {
rsp, err := mc.Read()
assert.Nil(t, err)
assert.Equal(t, serialNo, rsp.GetSerialNo())
- assert.Equal(t, body, rsp.GetResponseBuf())
+ assert.Equal(t, body, rsp.GetBuffer())
}(i)
}
wg.Wait()
[incubator-inlong] 10/12: Add missing file
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit e00a5ba1bfaf95360ae0648e140732b3a4229003
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Thu May 6 10:45:16 2021 +0800
Add missing file
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/tubemq_codec.go | 138 +++++++++++++++++++++
1 file changed, 138 insertions(+)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
new file mode 100644
index 0000000..96ebfa9
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+ "bufio"
+ "encoding/binary"
+ "errors"
+ "io"
+)
+
+const (
+ // The default begin token of TubeMQ RPC protocol.
+ RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+ // The default max buffer size the RPC response.
+ RPCMaxBufferSize uint32 = 8192
+ frameHeadLen uint32 = 12
+ maxBufferSize int = 128 * 1024
+ defaultMsgSize int = 4096
+ dataLen uint32 = 4
+ listSizeLen uint32 = 4
+ serialNoLen uint32 = 4
+ beginTokenLen uint32 = 4
+)
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+ reader io.Reader
+ msg []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+ bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+ return &TubeMQDecoder{
+ msg: make([]byte, defaultMsgSize),
+ reader: bufferReader,
+ }
+}
+
+// Decode will decode the response from TubeMQ to Response according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (Response, error) {
+ var num int
+ var err error
+ if num, err = io.ReadFull(t.reader, t.msg[:frameHeadLen]); err != nil {
+ return nil, err
+ }
+ if num != int(frameHeadLen) {
+ return nil, errors.New("framer: read frame header num invalid")
+ }
+ if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken {
+ return nil, errors.New("framer: read framer rpc protocol begin token not match")
+ }
+ serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+ listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+ totalLen := int(frameHeadLen)
+ for i := 0; i < int(listSize); i++ {
+ size := make([]byte, 4)
+ n, err := io.ReadFull(t.reader, size)
+ if err != nil {
+ return nil, err
+ }
+ if n != int(dataLen) {
+ return nil, errors.New("framer: read invalid size")
+ }
+
+ s := int(binary.BigEndian.Uint32(size))
+ if totalLen+s > len(t.msg) {
+ data := t.msg[:totalLen]
+ t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s))
+ copy(t.msg, data[:])
+ }
+
+ if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
+ return nil, err
+ }
+ if num != s {
+ return nil, errors.New("framer: read invalid data")
+ }
+ totalLen += s
+ }
+
+ data := make([]byte, totalLen-int(frameHeadLen))
+ copy(data, t.msg[frameHeadLen:totalLen])
+
+ return &TubeMQResponse{
+ serialNo: serialNo,
+ Buffer: data,
+ }, nil
+}
+
+// TubeMQRequest is the implementation of TubeMQ request.
+type TubeMQRequest struct {
+ serialNo uint32
+ req []byte
+}
+
+// TubeMQResponse is the TubeMQ implementation of Response.
+type TubeMQResponse struct {
+ serialNo uint32
+ Buffer []byte
+}
+
+// GetSerialNo will return the SerialNo of Response.
+func (t TubeMQResponse) GetSerialNo() uint32 {
+ return t.serialNo
+}
+
+// GetResponseBuf will return the body of Response.
+func (t TubeMQResponse) GetBuffer() []byte {
+ return t.Buffer
+}
+
+func max(x, y int) int {
+ if x < y {
+ return y
+ }
+ return x
+}
[incubator-inlong] 02/12: Add license and remove the chinese comment
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 0033c625af6c5e2e04805010c73d79615b7f7c2d
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri Apr 30 15:03:51 2021 +0800
Add license and remove the chinese comment
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/codec/codec.go | 17 +++++++++++++++++
tubemq-client-twins/tubemq-client-go/go.mod | 15 +++++++++++++++
.../tubemq-client-go/pool/multiplexed.go | 19 ++++++++++++++++++-
.../tubemq-client-go/pool/multlplexed_test.go | 19 ++++++++++++++++++-
4 files changed, 68 insertions(+), 2 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index ee27e96..f66fac9 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package codec
import (
diff --git a/tubemq-client-twins/tubemq-client-go/go.mod b/tubemq-client-twins/tubemq-client-go/go.mod
index 7c1a676..4d0a345 100644
--- a/tubemq-client-twins/tubemq-client-go/go.mod
+++ b/tubemq-client-twins/tubemq-client-go/go.mod
@@ -1,3 +1,18 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+// <p>
+// http://www.apache.org/licenses/LICENSE-2.0
+// <p>
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
module github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go
go 1.14
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
index 5d38a14..d5a60b1 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package pool
import (
@@ -344,7 +361,7 @@ func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
}
tlsConf := &tls.Config{}
- if opts.CACertFile == "none" { // 不需要检验服务证书
+ if opts.CACertFile == "none" {
tlsConf.InsecureSkipVerify = true
} else {
if len(opts.TLSServerName) == 0 {
diff --git a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
index 6377032..e136ebd 100644
--- a/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/pool/multlplexed_test.go
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package pool
import (
@@ -19,7 +36,7 @@ import (
)
var (
- address = "127.0.0.1:0"
+ address = "127.0.0.1:8888"
ch = make(chan struct{})
serialNo uint32 = 1
)
[incubator-inlong] 12/12: nitpick
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 2cd11c324dba8f1f8749782bd1f071f6fb62f3c1
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Fri May 7 20:07:27 2021 +0800
nitpick
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 825636a..0c4239c 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -134,7 +134,7 @@ func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
}
tlsConf := &tls.Config{}
- if opts.CACertFile == "none" {
+ if opts.CACertFile == "" {
tlsConf.InsecureSkipVerify = true
} else {
if len(opts.TLSServerName) == 0 {
[incubator-inlong] 06/12: Remove go.sum from git index
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit e0fd869f205e7115eae192ae9d65ef4a4f9ec37b
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat May 1 10:46:45 2021 +0800
Remove go.sum from git index
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/go.sum | 11 -----------
1 file changed, 11 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/go.sum b/tubemq-client-twins/tubemq-client-go/go.sum
deleted file mode 100644
index acb88a4..0000000
--- a/tubemq-client-twins/tubemq-client-go/go.sum
+++ /dev/null
@@ -1,11 +0,0 @@
-github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
-github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
-github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
-github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
-github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
-gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
[incubator-inlong] 05/12: Rename and add some comments
Posted by go...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit c6d3f751be782e5f62eb1f46b64366f169be8e38
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sat May 1 10:04:27 2021 +0800
Rename and add some comments
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/codec/codec.go | 16 ++++
tubemq-client-twins/tubemq-client-go/go.sum | 11 +++
.../multiplexing.go} | 98 +++++++++++++---------
.../multlplexing_test.go} | 6 +-
4 files changed, 88 insertions(+), 43 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index a6f3fee..b9ee1d7 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
package codec
import (
@@ -36,20 +39,27 @@ const (
beginTokenLen uint32 = 4
)
+// TransportResponse is the abstraction of the transport response.
type TransportResponse interface {
+ // GetSerialNo returns the `serialNo` of the corresponding request.
GetSerialNo() uint32
+ // GetResponseBuf returns the body of the response.
GetResponseBuf() []byte
}
+// Decoder is the abstraction of the decoder which is used to decode the response.
type Decoder interface {
+ // Decode will decode the response to frame head and body.
Decode() (TransportResponse, error)
}
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
type TubeMQDecoder struct {
reader io.Reader
msg []byte
}
+// New will return a default TubeMQDecoder.
func New(reader io.Reader) *TubeMQDecoder {
bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
return &TubeMQDecoder{
@@ -58,6 +68,8 @@ func New(reader io.Reader) *TubeMQDecoder {
}
}
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
if err != nil {
@@ -112,20 +124,24 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
}, nil
}
+// TubeMQRequest is the implementation of TubeMQ request.
type TubeMQRequest struct {
serialNo uint32
req []byte
}
+// TubeMQResponse is the TubeMQ implementation of TransportResponse.
type TubeMQResponse struct {
serialNo uint32
responseBuf []byte
}
+// GetSerialNo will return the SerialNo of TubeMQResponse.
func (t TubeMQResponse) GetSerialNo() uint32 {
return t.serialNo
}
+// GetResponseBuf will return the body of TubeMQResponse.
func (t TubeMQResponse) GetResponseBuf() []byte {
return t.responseBuf
}
diff --git a/tubemq-client-twins/tubemq-client-go/go.sum b/tubemq-client-twins/tubemq-client-go/go.sum
new file mode 100644
index 0000000..acb88a4
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-go/go.sum
@@ -0,0 +1,11 @@
+github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
similarity index 75%
rename from tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
rename to tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index 3792035..06e09d2 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multiplexed.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-// Package multiplexed defines the multiplexed connection pool for sending
-// request and receiving response. After receiving the response, it will
-// be decoded and returned to the client. It is used for the underlying communication
+// Package multiplexing defines the multiplex connection pool for sending
+// request and receiving response. After receiving the response, the decoded
+// response will be returned to the client. It is used for the communication
// with TubeMQ.
-package multiplexed
+package multiplexing
import (
"context"
@@ -35,16 +35,17 @@ import (
)
var (
- // ErrConnClosed indicates that the connection is closed
- ErrConnClosed = errors.New("connection is closed")
- // ErrChanClose indicates the recv chan is closed
- ErrChanClose = errors.New("unexpected recv chan close")
+ // ErrConnClosed indicates the connection has been closed
+ ErrConnClosed = errors.New("connection has been closed")
+ // ErrChanClosed indicates the recv chan has been closed
+ ErrChanClosed = errors.New("unexpected recv chan closing")
// ErrWriteBufferDone indicates write buffer done
ErrWriteBufferDone = errors.New("write buffer done")
- // ErrAssertConnectionFail indicates connection assertion error
- ErrAssertConnectionFail = errors.New("assert connection slice fail")
+ // ErrAssertConnection indicates connection assertion error
+ ErrAssertConnection = errors.New("connection assertion error")
)
+// The state of the connection.
const (
Initial int = iota
Connected
@@ -54,10 +55,12 @@ const (
const queueSize = 10000
+// Pool maintains the multiplex connections of different addresses.
type Pool struct {
connections *sync.Map
}
+// NewPool will construct a default multiplex connections pool.
func NewPool() *Pool {
m := &Pool{
connections: new(sync.Map),
@@ -65,7 +68,10 @@ func NewPool() *Pool {
return m
}
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexedConnection, error) {
+// Get will return a multiplex connection
+// 1. If no underlying TCP connection has been created, a TCP connection will be created first.
+// 2. A new multiplex connection with the serialNo will be created and returned.
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -76,12 +82,12 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
if c, ok := v.(*Connection); ok {
return c.new(ctx, serialNo)
}
- return nil, ErrAssertConnectionFail
+ return nil, ErrAssertConnection
}
c := &Connection{
address: address,
- connections: make(map[uint32]*MultiplexedConnection),
+ connections: make(map[uint32]*MultiplexConnection),
done: make(chan struct{}),
mDone: make(chan struct{}),
state: Initial,
@@ -100,38 +106,28 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
c.decoder = codec.New(conn)
c.conn = conn
c.state = Connected
+ c.pool = p
go c.reader()
go c.writer()
return c.new(ctx, serialNo)
}
-type writerBuffer struct {
- buffer chan []byte
- done <-chan struct{}
-}
-
-func (w *writerBuffer) get() ([]byte, error) {
- select {
- case req := <-w.buffer:
- return req, nil
- case <-w.done:
- return nil, ErrWriteBufferDone
- }
-}
-
type recvReader struct {
ctx context.Context
recv chan codec.TransportResponse
}
-type MultiplexedConnection struct {
+// MultiplexConnection is used to multiplex a TCP connection.
+// It is distinguished by the serialNo.
+type MultiplexConnection struct {
serialNo uint32
conn *Connection
reader *recvReader
done chan struct{}
}
-func (mc *MultiplexedConnection) Write(b []byte) error {
+// Write uses the underlying TCP connection to send the bytes.
+func (mc *MultiplexConnection) Write(b []byte) error {
if err := mc.conn.send(b); err != nil {
mc.conn.remove(mc.serialNo)
return err
@@ -139,7 +135,8 @@ func (mc *MultiplexedConnection) Write(b []byte) error {
return nil
}
-func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
+// Read returns the response from the multiplex connection.
+func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
select {
case <-mc.reader.ctx.Done():
mc.conn.remove(mc.serialNo)
@@ -151,17 +148,19 @@ func (mc *MultiplexedConnection) Read() (codec.TransportResponse, error) {
if mc.conn.err != nil {
return nil, mc.conn.err
}
- return nil, ErrChanClose
+ return nil, ErrChanClosed
case <-mc.done:
return nil, mc.conn.err
}
}
-func (mc *MultiplexedConnection) recv(rsp *codec.TubeMQResponse) {
+func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
mc.reader.recv <- rsp
mc.conn.remove(rsp.GetSerialNo())
}
+// DialOptions represents the dail options of the TCP connection.
+// If TLS is not enabled, the configuration of TLS can be ignored.
type DialOptions struct {
Network string
Address string
@@ -172,11 +171,13 @@ type DialOptions struct {
TLSServerName string
}
+// Connection represents the underlying TCP connection of the multiplex connections of an address
+// and maintains the multiplex connections.
type Connection struct {
err error
address string
mu sync.RWMutex
- connections map[uint32]*MultiplexedConnection
+ connections map[uint32]*MultiplexConnection
decoder codec.Decoder
conn net.Conn
done chan struct{}
@@ -187,14 +188,14 @@ type Connection struct {
pool *Pool
}
-func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConnection, error) {
+func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnection, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.err != nil {
return nil, c.err
}
- vc := &MultiplexedConnection{
+ mc := &MultiplexConnection{
serialNo: serialNo,
conn: c,
done: c.mDone,
@@ -204,11 +205,11 @@ func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexedConn
},
}
- if prevConn, ok := c.connections[serialNo]; ok {
- close(prevConn.reader.recv)
+ if lastConn, ok := c.connections[serialNo]; ok {
+ close(lastConn.reader.recv)
}
- c.connections[serialNo] = vc
- return vc, nil
+ c.connections[serialNo] = mc
+ return mc, nil
}
func (c *Connection) close(lastErr error, done chan struct{}) {
@@ -230,7 +231,7 @@ func (c *Connection) close(lastErr error, done chan struct{}) {
c.state = Closing
c.err = lastErr
- c.connections = make(map[uint32]*MultiplexedConnection)
+ c.connections = make(map[uint32]*MultiplexConnection)
close(c.done)
if c.conn != nil {
c.conn.Close()
@@ -311,6 +312,9 @@ func (c *Connection) write(b []byte) error {
return nil
}
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the serialNo.
func (c *Connection) reader() {
var lastErr error
for {
@@ -337,6 +341,20 @@ func (c *Connection) reader() {
c.close(lastErr, c.done)
}
+type writerBuffer struct {
+ buffer chan []byte
+ done <-chan struct{}
+}
+
+func (w *writerBuffer) get() ([]byte, error) {
+ select {
+ case req := <-w.buffer:
+ return req, nil
+ case <-w.done:
+ return nil, ErrWriteBufferDone
+ }
+}
+
func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
var timeout time.Duration
t, ok := ctx.Deadline()
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
similarity index 96%
rename from tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
rename to tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index 4584607..af1d416 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexed/multlplexed_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package multiplexed
+package multiplexing
import (
"bytes"
@@ -87,7 +87,7 @@ func Encode(serialNo uint32, body []byte) ([]byte, error) {
return buf.Bytes(), nil
}
-func TestBasicMultiplexed(t *testing.T) {
+func TestBasicMultiplexing(t *testing.T) {
serialNo := atomic.AddUint32(&serialNo, 1)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
@@ -107,7 +107,7 @@ func TestBasicMultiplexed(t *testing.T) {
assert.Equal(t, mc.Write(nil), nil)
}
-func TestConcurrentMultiplexed(t *testing.T) {
+func TestConcurrentMultiplexing(t *testing.T) {
count := 1000
m := NewPool()
wg := sync.WaitGroup{}