You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/10 10:32:01 UTC
[incubator-inlong] 08/12: nitpick for codec and dialopts
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit 459405a5dfd214f448b05bb796baccfd76081891
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Sun May 2 15:00:59 2021 +0800
nitpick for codec and dialopts
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
tubemq-client-twins/tubemq-client-go/codec/codec.go | 11 ++++-------
.../tubemq-client-go/multiplexing/multiplexing.go | 18 +++++++-----------
.../tubemq-client-go/multiplexing/multlplexing_test.go | 12 ++++++++++--
3 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/codec/codec.go b/tubemq-client-twins/tubemq-client-go/codec/codec.go
index b9ee1d7..fb5c945 100644
--- a/tubemq-client-twins/tubemq-client-go/codec/codec.go
+++ b/tubemq-client-twins/tubemq-client-go/codec/codec.go
@@ -30,7 +30,7 @@ import (
const (
RPCProtocolBeginToken uint32 = 0xFF7FF4FE
RPCMaxBufferSize uint32 = 8192
- frameHeadLen uint32 = 8
+ frameHeadLen uint32 = 12
maxBufferSize int = 128 * 1024
defaultMsgSize int = 4096
dataLen uint32 = 4
@@ -82,11 +82,8 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
if token != RPCProtocolBeginToken {
return nil, errors.New("framer: read framer rpc protocol begin token not match")
}
- num, err = io.ReadFull(t.reader, t.msg[frameHeadLen:frameHeadLen+listSizeLen])
- if num != int(listSizeLen) {
- return nil, errors.New("framer: read invalid list size num")
- }
- listSize := binary.BigEndian.Uint32(t.msg[frameHeadLen : frameHeadLen+listSizeLen])
+ serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+ listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
totalLen := int(frameHeadLen)
size := make([]byte, 4)
for i := 0; i < int(listSize); i++ {
@@ -119,7 +116,7 @@ func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
copy(data, t.msg[frameHeadLen:totalLen])
return TubeMQResponse{
- serialNo: binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen]),
+ serialNo: serialNo,
responseBuf: data,
}, nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
index d1ee603..783c8e8 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
@@ -71,7 +71,7 @@ func NewPool() *Pool {
// Get will return a multiplex connection
// 1. If no underlying TCP connection has been created, a TCP connection will be created first.
// 2. A new multiplex connection with the serialNo will be created and returned.
-func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*MultiplexConnection, error) {
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
@@ -98,7 +98,7 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
}
p.connections.Store(address, c)
- conn, dialOpts, err := dial(ctx, address)
+ conn, dialOpts, err := dial(ctx, address, opts)
c.dialOpts = dialOpts
if err != nil {
return nil, err
@@ -112,24 +112,20 @@ func (p *Pool) Get(ctx context.Context, address string, serialNo uint32) (*Multi
return c.new(ctx, serialNo)
}
-func dial(ctx context.Context, address string) (net.Conn, *DialOptions, error) {
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
var timeout time.Duration
t, ok := ctx.Deadline()
if ok {
timeout = t.Sub(time.Now())
}
- dialOpts := &DialOptions{
- Network: "tcp",
- Address: address,
- Timeout: timeout,
- }
+ opts.Timeout = timeout
select {
case <-ctx.Done():
- return nil, dialOpts, ctx.Err()
+ return nil, opts, ctx.Err()
default:
}
- conn, err := dialWithTimeout(dialOpts)
- return conn, dialOpts, err
+ conn, err := dialWithTimeout(opts)
+ return conn, opts, err
}
func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
diff --git a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
index af1d416..0c6d6b9 100644
--- a/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
+++ b/tubemq-client-twins/tubemq-client-go/multiplexing/multlplexing_test.go
@@ -93,7 +93,11 @@ func TestBasicMultiplexing(t *testing.T) {
defer cancel()
m := NewPool()
- mc, err := m.Get(ctx, address, serialNo)
+ opts := &DialOptions{
+ Network: "tcp",
+ Address: address,
+ }
+ mc, err := m.Get(ctx, address, serialNo, opts)
body := []byte("hello world")
buf, err := Encode(serialNo, body)
@@ -118,7 +122,11 @@ func TestConcurrentMultiplexing(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
serialNo := atomic.AddUint32(&serialNo, 1)
- mc, err := m.Get(ctx, address, serialNo)
+ opts := &DialOptions{
+ Network: "tcp",
+ Address: address,
+ }
+ mc, err := m.Get(ctx, address, serialNo, opts)
assert.Nil(t, err)
body := []byte("hello world" + strconv.Itoa(i))