You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@inlong.apache.org by GitBox <gi...@apache.org> on 2021/04/30 07:35:55 UTC

[GitHub] [incubator-inlong] TszKitLo40 opened a new pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

TszKitLo40 opened a new pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter commented on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (db00c03) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **not change** coverage.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head db00c03 differs from pull request most recent head 877be5d. Consider uploading reports for the commit 877be5d to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff             @@
   ##             INLONG-25    #464   +/-   ##
   ===========================================
     Coverage         7.52%   7.52%           
     Complexity         481     481           
   ===========================================
     Files              267     267           
     Lines            29500   29500           
     Branches          4843    4843           
   ===========================================
     Hits              2219    2219           
     Misses           26808   26808           
     Partials           473     473           
   ```
   
   
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...877be5d](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e3e958a) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | `8.00% <0.00%> (-1.00%)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | `34.00% <0.00%> (-1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...e3e958a](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aa29190) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.49%   -0.03%     
     Complexity         481     481              
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2212       -7     
   - Misses           26808   26814       +6     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvbm5lY3RvcnMvdHViZW1xLWNvbm5lY3Rvci1mbHVtZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZmx1bWUvc2luay90dWJlbXEvVHViZW1xU2luay5qYXZh) | `51.42% <0.00%> (-4.00%)` | `14.00% <0.00%> (ø%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...aa29190](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r625823974



##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE

Review comment:
       Exportable variables and functions require comments.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {

Review comment:
       You can use Response class name, becase if use like codec.Response.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte

Review comment:
       You can rename GetBuffer, becase this interface is Response.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {

Review comment:
       Can merge if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken {}

##########
File path: tubemq-client-twins/tubemq-client-go/multiplexing/multiplexing.go
##########
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package multiplexing defines the multiplex connection pool for sending
+// request and receiving response. After receiving the response, the decoded
+// response will be returned to the client. It is used for the communication
+// with TubeMQ.
+package multiplexing
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var (
+	// ErrConnClosed indicates the connection has been closed
+	ErrConnClosed = errors.New("connection has been closed")
+	// ErrChanClosed indicates the recv chan has been closed
+	ErrChanClosed = errors.New("unexpected recv chan closing")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnection indicates connection assertion error
+	ErrAssertConnection = errors.New("connection assertion error")
+)
+
+// The state of the connection.
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+const queueSize = 10000
+
+// Pool maintains the multiplex connections of different addresses.
+type Pool struct {
+	connections *sync.Map
+}
+
+// NewPool will construct a default multiplex connections pool.
+func NewPool() *Pool {
+	m := &Pool{
+		connections: new(sync.Map),
+	}
+	return m
+}
+
+// Get will return a multiplex connection
+// 1. If no underlying TCP connection has been created, a TCP connection will be created first.
+// 2. A new multiplex connection with the serialNo will be created and returned.
+func (p *Pool) Get(ctx context.Context, address string, serialNo uint32, opts *DialOptions) (*MultiplexConnection, error) {
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	default:
+	}
+
+	if v, ok := p.connections.Load(address); ok {
+		if c, ok := v.(*Connection); ok {
+			return c.new(ctx, serialNo)
+		}
+		return nil, ErrAssertConnection
+	}
+
+	c := &Connection{
+		address:     address,
+		connections: make(map[uint32]*MultiplexConnection),
+		done:        make(chan struct{}),
+		mDone:       make(chan struct{}),
+		state:       Initial,
+	}
+	c.buffer = &writerBuffer{
+		buffer: make(chan []byte, queueSize),
+		done:   c.done,
+	}
+	p.connections.Store(address, c)
+
+	conn, dialOpts, err := dial(ctx, address, opts)
+	c.dialOpts = dialOpts
+	if err != nil {
+		return nil, err
+	}
+	c.decoder = codec.New(conn)
+	c.conn = conn
+	c.state = Connected
+	c.pool = p
+	go c.reader()
+	go c.writer()
+	return c.new(ctx, serialNo)
+}
+
+func dial(ctx context.Context, address string, opts *DialOptions) (net.Conn, *DialOptions, error) {
+	var timeout time.Duration
+	t, ok := ctx.Deadline()
+	if ok {
+		timeout = t.Sub(time.Now())
+	}
+	opts.Timeout = timeout
+	select {
+	case <-ctx.Done():
+		return nil, opts, ctx.Err()
+	default:
+	}
+	conn, err := dialWithTimeout(opts)
+	return conn, opts, err
+}
+
+func dialWithTimeout(opts *DialOptions) (net.Conn, error) {
+	if len(opts.CACertFile) == 0 {
+		return net.DialTimeout(opts.Network, opts.Address, opts.Timeout)
+	}
+
+	tlsConf := &tls.Config{}
+	if opts.CACertFile == "none" {
+		tlsConf.InsecureSkipVerify = true
+	} else {
+		if len(opts.TLSServerName) == 0 {
+			opts.TLSServerName = opts.Address
+		}
+		tlsConf.ServerName = opts.TLSServerName
+		certPool, err := getCertPool(opts.CACertFile)
+		if err != nil {
+			return nil, err
+		}
+
+		tlsConf.RootCAs = certPool
+
+		if len(opts.TLSCertFile) != 0 {
+			cert, err := tls.LoadX509KeyPair(opts.TLSCertFile, opts.TLSKeyFile)
+			if err != nil {
+				return nil, err
+			}
+			tlsConf.Certificates = []tls.Certificate{cert}
+		}
+	}
+	return tls.DialWithDialer(&net.Dialer{Timeout: opts.Timeout}, opts.Network, opts.Address, tlsConf)
+}
+
+func getCertPool(caCertFile string) (*x509.CertPool, error) {
+	if caCertFile != "root" {
+		ca, err := ioutil.ReadFile(caCertFile)
+		if err != nil {
+			return nil, err
+		}
+		certPool := x509.NewCertPool()
+		ok := certPool.AppendCertsFromPEM(ca)
+		if !ok {
+			return nil, err
+		}
+		return certPool, nil
+	}
+	return nil, nil
+}
+
+type recvReader struct {
+	ctx  context.Context
+	recv chan codec.TransportResponse
+}
+
+// MultiplexConnection is used to multiplex a TCP connection.
+// It is distinguished by the serialNo.
+type MultiplexConnection struct {
+	serialNo uint32
+	conn     *Connection
+	reader   *recvReader
+	done     chan struct{}
+}
+
+// Write uses the underlying TCP connection to send the bytes.
+func (mc *MultiplexConnection) Write(b []byte) error {
+	if err := mc.conn.send(b); err != nil {
+		mc.conn.remove(mc.serialNo)
+		return err
+	}
+	return nil
+}
+
+// Read returns the response from the multiplex connection.
+func (mc *MultiplexConnection) Read() (codec.TransportResponse, error) {
+	select {
+	case <-mc.reader.ctx.Done():
+		mc.conn.remove(mc.serialNo)
+		return nil, mc.reader.ctx.Err()
+	case v, ok := <-mc.reader.recv:
+		if ok {
+			return v, nil
+		}
+		if mc.conn.err != nil {
+			return nil, mc.conn.err
+		}
+		return nil, ErrChanClosed
+	case <-mc.done:
+		return nil, mc.conn.err
+	}
+}
+
+func (mc *MultiplexConnection) recv(rsp *codec.TubeMQResponse) {
+	mc.reader.recv <- rsp
+	mc.conn.remove(rsp.GetSerialNo())
+}
+
+// DialOptions represents the dail options of the TCP connection.
+// If TLS is not enabled, the configuration of TLS can be ignored.
+type DialOptions struct {
+	Network       string
+	Address       string
+	Timeout       time.Duration
+	CACertFile    string
+	TLSCertFile   string
+	TLSKeyFile    string
+	TLSServerName string
+}
+
+// Connection represents the underlying TCP connection of the multiplex connections of an address
+// and maintains the multiplex connections.
+type Connection struct {
+	err         error
+	address     string
+	mu          sync.RWMutex
+	connections map[uint32]*MultiplexConnection
+	decoder     codec.Decoder
+	conn        net.Conn
+	done        chan struct{}
+	mDone       chan struct{}
+	buffer      *writerBuffer
+	dialOpts    *DialOptions
+	state       int
+	pool        *Pool
+}
+
+func (c *Connection) new(ctx context.Context, serialNo uint32) (*MultiplexConnection, error) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.err != nil {
+		return nil, c.err
+	}
+
+	mc := &MultiplexConnection{
+		serialNo: serialNo,
+		conn:     c,
+		done:     c.mDone,
+		reader: &recvReader{
+			ctx:  ctx,
+			recv: make(chan codec.TransportResponse, 1),
+		},
+	}
+
+	if lastConn, ok := c.connections[serialNo]; ok {
+		close(lastConn.reader.recv)
+	}
+	c.connections[serialNo] = mc
+	return mc, nil
+}
+
+func (c *Connection) close(lastErr error, done chan struct{}) {
+	if lastErr == nil {
+		return
+	}
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.state == Closed {
+		return
+	}
+
+	select {
+	case <-done:
+		return
+	default:
+	}
+
+	c.state = Closing
+	c.err = lastErr
+	c.connections = make(map[uint32]*MultiplexConnection)
+	close(c.done)
+	if c.conn != nil {
+		c.conn.Close()
+	}
+	err := c.reconnect()
+	if err != nil {
+		c.state = Closed
+		close(c.mDone)
+		c.pool.connections.Delete(c)
+	}
+}
+
+func (c *Connection) reconnect() error {
+	conn, err := dialWithTimeout(c.dialOpts)
+	if err != nil {
+		return err
+	}
+	c.done = make(chan struct{})
+	c.conn = conn
+	c.decoder = codec.New(conn)
+	c.buffer.done = c.done
+	c.state = Connected
+	c.err = nil
+	go c.reader()
+	go c.writer()
+	return nil
+}
+
+// The response handling logic of the TCP connection.
+// 1. Read from the connection and decode it to the TransportResponse.
+// 2. Send the response to the corresponding multiplex connection based on the serialNo.
+func (c *Connection) reader() {
+	var lastErr error
+	for {
+		select {
+		case <-c.done:
+			return
+		default:
+		}
+		rsp, err := c.decoder.Decode()
+		if err != nil {
+			lastErr = err
+			break
+		}
+		serialNo := rsp.GetSerialNo()
+		c.mu.RLock()
+		mc, ok := c.connections[serialNo]
+		c.mu.RUnlock()
+		if !ok {
+			continue
+		}
+		mc.reader.recv <- rsp
+		mc.conn.remove(rsp.GetSerialNo())

Review comment:
       Can use like this, mc.conn.remove(serialNo).

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	size := make([]byte, 4)
+	for i := 0; i < int(listSize); i++ {
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, totalLen+s)
+			copy(t.msg, data[:])
+		}
+
+		num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s])

Review comment:
       Can use if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
      return err
   }

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {

Review comment:
       TubeMQDecoder need other file to writer.

##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 12
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+// TransportResponse is the abstraction of the transport response.
+type TransportResponse interface {
+	// GetSerialNo returns the `serialNo` of the corresponding request.
+	GetSerialNo() uint32
+	// GetResponseBuf returns the body of the response.
+	GetResponseBuf() []byte
+}
+
+// Decoder is the abstraction of the decoder which is used to decode the response.
+type Decoder interface {
+	// Decode will decode the response to frame head and body.
+	Decode() (TransportResponse, error)
+}
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to TransportResponse according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (TransportResponse, error) {
+	num, err := io.ReadFull(t.reader, t.msg[:frameHeadLen])
+	if err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	token := binary.BigEndian.Uint32(t.msg[:beginTokenLen])
+	if token != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	size := make([]byte, 4)
+	for i := 0; i < int(listSize); i++ {
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, totalLen+s)

Review comment:
       Perhaps more space is needed in the future, and a multiple growth plan should be used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623691371



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool

Review comment:
       Please comment the package.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623693097



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000

Review comment:
       Can use const.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623698662



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {

Review comment:
       Suggested code order:
   1.New pool.
   2.pool, and pool.Get.
   3.MultiplexedConnection.
   4.Connection.
   5.writerBuffer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623694550



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {

Review comment:
       Multiplexed need name Pool, if this package name multiplexed, you can use Pool, like multiplexed.Pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623692625



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool

Review comment:
       Can use multiplexedpool to this package. pool maybe buffer or io pool ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623694550



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {

Review comment:
       Multiplexed need name Pool, if this package name multiplexedpool. if use is multiplexedpool.Pool.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6cfd84e) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | `8.00% <0.00%> (-1.00%)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | `34.00% <0.00%> (-1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...6cfd84e](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623698662



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {

Review comment:
       Suggested code order:
   1.New pool.
   2.pool, and pool.Get.
   3.MultiplexedConnection.
   4.Connection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (0db0d06) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head 0db0d06 differs from pull request most recent head 423a75d. Consider uploading reports for the commit 423a75d to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.01%     
   + Complexity         481     480       -1     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2217       -2     
   - Misses           26808   26810       +2     
     Partials           473     473              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | `8.00% <0.00%> (-1.00%)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | `34.00% <0.00%> (-1.00%)` | |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `46.09% <0.00%> (+0.39%)` | `22.00% <0.00%> (+1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...423a75d](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r627456782



##########
File path: tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	// The default begin token of TubeMQ RPC protocol.
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	// The default max buffer size the RPC response.
+	RPCMaxBufferSize uint32 = 8192
+	frameHeadLen     uint32 = 12
+	maxBufferSize    int    = 128 * 1024
+	defaultMsgSize   int    = 4096
+	dataLen          uint32 = 4
+	listSizeLen      uint32 = 4
+	serialNoLen      uint32 = 4
+	beginTokenLen    uint32 = 4
+)
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to Response according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (Response, error) {
+	var num int
+	var err error
+	if num, err = io.ReadFull(t.reader, t.msg[:frameHeadLen]); err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	for i := 0; i < int(listSize); i++ {
+		size := make([]byte, 4)
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s))
+			copy(t.msg, data[:])
+		}
+
+		if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
+			return nil, err
+		}
+		if num != s {
+			return nil, errors.New("framer: read invalid data")
+		}
+		totalLen += s
+	}
+
+	data := make([]byte, totalLen-int(frameHeadLen))
+	copy(data, t.msg[frameHeadLen:totalLen])
+
+	return &TubeMQResponse{
+		serialNo: serialNo,
+		Buffer:   data,
+	}, nil
+}
+
+// TubeMQRequest is the implementation of TubeMQ request.
+type TubeMQRequest struct {
+	serialNo uint32
+	req      []byte
+}
+
+// TubeMQResponse is the TubeMQ implementation of Response.
+type TubeMQResponse struct {
+	serialNo uint32
+	Buffer   []byte
+}
+
+// GetSerialNo will return the SerialNo of Response.
+func (t TubeMQResponse) GetSerialNo() uint32 {
+	return t.serialNo
+}
+
+// GetResponseBuf will return the body of Response.
+func (t TubeMQResponse) GetBuffer() []byte {
+	return t.Buffer
+}
+
+func max(x, y int) int {

Review comment:
       You can use the math library directly




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623730734



##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+

Review comment:
       Package comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] TszKitLo40 commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
TszKitLo40 commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r627951715



##########
File path: tubemq-client-twins/tubemq-client-go/codec/tubemq_codec.go
##########
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package codec defines the encoding and decoding logic between TubeMQ.
+// If the protocol of encoding and decoding is changed, only this package
+// will need to be changed.
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	// The default begin token of TubeMQ RPC protocol.
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	// The default max buffer size the RPC response.
+	RPCMaxBufferSize uint32 = 8192
+	frameHeadLen     uint32 = 12
+	maxBufferSize    int    = 128 * 1024
+	defaultMsgSize   int    = 4096
+	dataLen          uint32 = 4
+	listSizeLen      uint32 = 4
+	serialNoLen      uint32 = 4
+	beginTokenLen    uint32 = 4
+)
+
+// TubeMQDecoder is the implementation of the decoder of response from TubeMQ.
+type TubeMQDecoder struct {
+	reader io.Reader
+	msg    []byte
+}
+
+// New will return a default TubeMQDecoder.
+func New(reader io.Reader) *TubeMQDecoder {
+	bufferReader := bufio.NewReaderSize(reader, maxBufferSize)
+	return &TubeMQDecoder{
+		msg:    make([]byte, defaultMsgSize),
+		reader: bufferReader,
+	}
+}
+
+// Decode will decode the response from TubeMQ to Response according to
+// the RPC protocol of TubeMQ.
+func (t *TubeMQDecoder) Decode() (Response, error) {
+	var num int
+	var err error
+	if num, err = io.ReadFull(t.reader, t.msg[:frameHeadLen]); err != nil {
+		return nil, err
+	}
+	if num != int(frameHeadLen) {
+		return nil, errors.New("framer: read frame header num invalid")
+	}
+	if binary.BigEndian.Uint32(t.msg[:beginTokenLen]) != RPCProtocolBeginToken {
+		return nil, errors.New("framer: read framer rpc protocol begin token not match")
+	}
+	serialNo := binary.BigEndian.Uint32(t.msg[beginTokenLen : beginTokenLen+serialNoLen])
+	listSize := binary.BigEndian.Uint32(t.msg[beginTokenLen+serialNoLen : beginTokenLen+serialNoLen+listSizeLen])
+	totalLen := int(frameHeadLen)
+	for i := 0; i < int(listSize); i++ {
+		size := make([]byte, 4)
+		n, err := io.ReadFull(t.reader, size)
+		if err != nil {
+			return nil, err
+		}
+		if n != int(dataLen) {
+			return nil, errors.New("framer: read invalid size")
+		}
+
+		s := int(binary.BigEndian.Uint32(size))
+		if totalLen+s > len(t.msg) {
+			data := t.msg[:totalLen]
+			t.msg = make([]byte, 0, max(2*len(t.msg), totalLen+s))
+			copy(t.msg, data[:])
+		}
+
+		if num, err = io.ReadFull(t.reader, t.msg[totalLen:totalLen+s]); err != nil {
+			return nil, err
+		}
+		if num != s {
+			return nil, errors.New("framer: read invalid data")
+		}
+		totalLen += s
+	}
+
+	data := make([]byte, totalLen-int(frameHeadLen))
+	copy(data, t.msg[frameHeadLen:totalLen])
+
+	return &TubeMQResponse{
+		serialNo: serialNo,
+		Buffer:   data,
+	}, nil
+}
+
+// TubeMQRequest is the implementation of TubeMQ request.
+type TubeMQRequest struct {
+	serialNo uint32
+	req      []byte
+}
+
+// TubeMQResponse is the TubeMQ implementation of Response.
+type TubeMQResponse struct {
+	serialNo uint32
+	Buffer   []byte
+}
+
+// GetSerialNo will return the SerialNo of Response.
+func (t TubeMQResponse) GetSerialNo() uint32 {
+	return t.serialNo
+}
+
+// GetResponseBuf will return the body of Response.
+func (t TubeMQResponse) GetBuffer() []byte {
+	return t.Buffer
+}
+
+func max(x, y int) int {

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (28d9d97) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head 28d9d97 differs from pull request most recent head 877be5d. Consider uploading reports for the commit 877be5d to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.49%   -0.03%     
   + Complexity         481     480       -1     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2211       -8     
   - Misses           26808   26814       +6     
   - Partials           473     475       +2     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../java/org/apache/flume/sink/tubemq/TubemqSink.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvbm5lY3RvcnMvdHViZW1xLWNvbm5lY3Rvci1mbHVtZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvZmx1bWUvc2luay90dWJlbXEvVHViZW1xU2luay5qYXZh) | `51.42% <0.00%> (-4.00%)` | `14.00% <0.00%> (ø%)` | |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `45.31% <0.00%> (-0.40%)` | `20.00% <0.00%> (-1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...877be5d](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (644a440) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.01%     
   + Complexity         481     480       -1     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2217       -2     
   - Misses           26808   26810       +2     
     Partials           473     473              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | `8.00% <0.00%> (-1.00%)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | `34.00% <0.00%> (-1.00%)` | |
   | [.../producer/qltystats/DefaultBrokerRcvQltyStats.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvdHViZW1xL2NsaWVudC9wcm9kdWNlci9xbHR5c3RhdHMvRGVmYXVsdEJyb2tlclJjdlFsdHlTdGF0cy5qYXZh) | `46.09% <0.00%> (+0.39%)` | `22.00% <0.00%> (+1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...644a440](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (423a75d) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | `8.00% <0.00%> (-1.00%)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | `34.00% <0.00%> (-1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...423a75d](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623732105



##########
File path: tubemq-client-twins/tubemq-client-go/codec/codec.go
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package codec
+
+import (
+	"bufio"
+	"encoding/binary"
+	"errors"
+	"io"
+)
+
+const (
+	RPCProtocolBeginToken uint32 = 0xFF7FF4FE
+	RPCMaxBufferSize      uint32 = 8192
+	frameHeadLen          uint32 = 8
+	maxBufferSize         int    = 128 * 1024
+	defaultMsgSize        int    = 4096
+	dataLen               uint32 = 4
+	listSizeLen           uint32 = 4
+	serialNoLen           uint32 = 4
+	beginTokenLen         uint32 = 4
+)
+
+type TransportResponse interface {
+	GetSerialNo() uint32
+	GetResponseBuf() []byte
+}
+
+type Decoder interface {

Review comment:
       Exportable variables and functions require comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] charlely commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
charlely commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r623692625



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool

Review comment:
       Can use multiplexed to this package. pool maybe buffer or io pool ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] codecov-commenter edited a comment on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-830031922


   # [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#464](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aec9b22) into [INLONG-25](https://codecov.io/gh/apache/incubator-inlong/commit/3249de37acf054a9c43677131cfbb09fc6d366d1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3249de3) will **decrease** coverage by `0.01%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-inlong/pull/464/graphs/tree.svg?width=650&height=150&src=pr&token=1EUK92O9K2&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff               @@
   ##             INLONG-25    #464      +/-   ##
   ==============================================
   - Coverage         7.52%   7.51%   -0.02%     
   + Complexity         481     479       -2     
   ==============================================
     Files              267     267              
     Lines            29500   29500              
     Branches          4843    4843              
   ==============================================
   - Hits              2219    2216       -3     
   - Misses           26808   26810       +2     
   - Partials           473     474       +1     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/tubemq/corebase/policies/FlowCtrlItem.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybEl0ZW0uamF2YQ==) | `38.88% <0.00%> (-1.12%)` | `8.00% <0.00%> (-1.00%)` | |
   | [.../tubemq/corebase/policies/FlowCtrlRuleHandler.java](https://codecov.io/gh/apache/incubator-inlong/pull/464/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dHViZW1xLWNvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3R1YmVtcS9jb3JlYmFzZS9wb2xpY2llcy9GbG93Q3RybFJ1bGVIYW5kbGVyLmphdmE=) | `43.98% <0.00%> (-0.59%)` | `34.00% <0.00%> (-1.00%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3249de3...aec9b22](https://codecov.io/gh/apache/incubator-inlong/pull/464?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] TszKitLo40 commented on pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
TszKitLo40 commented on pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#issuecomment-829904786


   @gosonzhang @charlely PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] TszKitLo40 commented on a change in pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
TszKitLo40 commented on a change in pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464#discussion_r624365229



##########
File path: tubemq-client-twins/tubemq-client-go/pool/multiplexed.go
##########
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"errors"
+	"io/ioutil"
+	"net"
+	"sync"
+	"time"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+)
+
+var DefaultMultiplexedPool = New()
+
+var (
+	// ErrConnClosed indicates that the connection is closed
+	ErrConnClosed = errors.New("connection is closed")
+	// ErrChanClose indicates the recv chan is closed
+	ErrChanClose = errors.New("unexpected recv chan close")
+	// ErrWriteBufferDone indicates write buffer done
+	ErrWriteBufferDone = errors.New("write buffer done")
+	// ErrAssertConnectionFail indicates connection assertion error
+	ErrAssertConnectionFail = errors.New("assert connection slice fail")
+)
+
+const (
+	Initial int = iota
+	Connected
+	Closing
+	Closed
+)
+
+var queueSize = 10000
+
+func New() *Multiplexed {

Review comment:
       I have changed the file and package name to multiplexing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-inlong] gosonzhang merged pull request #464: [INLONG-600]Multiplexed connection pool for Go sdk

Posted by GitBox <gi...@apache.org>.
gosonzhang merged pull request #464:
URL: https://github.com/apache/incubator-inlong/pull/464


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org