You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2021/05/24 07:43:01 UTC

[GitHub] [incubator-inlong] charlely commented on a change in pull request #472: [INLONG-604]rpc request for Go SDK

charlely commented on a change in pull request #472:
URL: https://github.com/apache/incubator-inlong/pull/472#discussion_r637663475



##########
File path: tubemq-client-twins/tubemq-client-go/rpc/broker.go
##########
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+	"github.com/golang/protobuf/proto"

Review comment:
       Change to google.golang.org/protobuf.
   
   

##########
File path: tubemq-client-twins/tubemq-client-go/rpc/broker.go
##########
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+)
+
+const (
+	Register   = 31
+	Unregister = 32
+)
+
+const (
+	BrokerProducerRegister = iota + 11
+	BrokerProducerHeartbeat
+	BrokerProducerSendMsg
+	BrokerProducerClose
+	BrokerConsumerRegister
+	BrokerConsumerHeartbeat
+	BrokerConsumerGetMsg
+	BrokerConsumerCommit
+	BrokerConsumerClose
+)
+
+// RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol.
+func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+	reqC2B := &protocol.RegisterRequestC2B{
+		OpType:        proto.Int32(Register),
+		ClientId:      proto.String(sub.GetClientID()),
+		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
+		TopicName:     proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+		PartitionId:   proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+		QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		ReadStatus:    proto.Int32(metadata.GetReadStatus()),
+		AuthInfo:      sub.GetAuthorizedInfo(),
+	}
+	if sub.IsFiltered(metadata.GetSubscribeInfo().GetPartition().GetTopic()) {
+		tfs := sub.GetTopicFilters()
+		reqC2B.FilterCondStr = make([]string, 0, len(tfs[metadata.GetSubscribeInfo().GetPartition().GetTopic()]))
+		for _, tf := range tfs[metadata.GetSubscribeInfo().GetPartition().GetTopic()] {
+			reqC2B.FilterCondStr = append(reqC2B.FilterCondStr, tf)
+		}
+	}
+	offset := sub.GetAssignedPartOffset(metadata.GetSubscribeInfo().GetPartition().GetPartitionKey())
+	if offset != client.InValidOffset {
+		reqC2B.CurrOffset = proto.Int64(offset)
+	}
+	data, err := proto.Marshal(reqC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetMarshalFailure, err.Error())
+	}
+	req := codec.NewRPCRequest()
+	req.RpcHeader = &protocol.RpcConnHeader{
+		Flag: proto.Int32(0),
+	}
+	req.RequestHeader = &protocol.RequestHeader{
+		ServiceType: proto.Int32(ReadService),
+		ProtocolVer: proto.Int32(2),
+	}
+	req.RequestBody = &protocol.RequestBody{
+		Method:  proto.Int32(BrokerConsumerRegister),
+		Request: data,
+		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+	}
+	rsp, err := c.client.DoRequest(c.ctx, req)
+	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+		if v.ResponseException != nil {
+			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
+		}
+		rspC2B := &protocol.RegisterResponseB2C{}
+		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+		if err != nil {
+			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+		}
+		return rspC2B, nil
+	}
+	return nil, errs.ErrAssertionFailure
+}
+
+// UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol.
+func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+	reqC2B := &protocol.RegisterRequestC2B{
+		OpType:      proto.Int32(Unregister),
+		ClientId:    proto.String(sub.GetClientID()),
+		GroupName:   proto.String(metadata.GetSubscribeInfo().GetGroup()),
+		TopicName:   proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+		PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+		ReadStatus:  proto.Int32(metadata.GetReadStatus()),
+		AuthInfo:    sub.GetAuthorizedInfo(),
+	}
+	req := codec.NewRPCRequest()
+	req.RpcHeader = &protocol.RpcConnHeader{
+		Flag: proto.Int32(0),
+	}
+	data, err := proto.Marshal(reqC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetMarshalFailure, err.Error())
+	}
+	req.RequestHeader = &protocol.RequestHeader{
+		ServiceType: proto.Int32(ReadService),
+		ProtocolVer: proto.Int32(2),
+	}
+	req.RequestBody = &protocol.RequestBody{
+		Method:  proto.Int32(BrokerConsumerRegister),
+		Request: data,
+		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+	}
+	rsp, err := c.client.DoRequest(c.ctx, req)
+	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+		if v.ResponseException != nil {
+			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
+		}
+		rspC2B := &protocol.RegisterResponseB2C{}
+		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+		if err != nil {
+			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+		}
+		return rspC2B, nil
+	}
+	return nil, errs.ErrAssertionFailure
+}
+
+// GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol.
+func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
+	reqC2B := &protocol.GetMessageRequestC2B{
+		ClientId:           proto.String(sub.GetClientID()),
+		PartitionId:        proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+		GroupName:          proto.String(metadata.GetSubscribeInfo().GetGroup()),
+		TopicName:          proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+		EscFlowCtrl:        proto.Bool(r.GetUnderGroupCtrl()),
+		LastPackConsumed:   proto.Bool(metadata.GetSubscribeInfo().GetPartition().GetLastConsumed()),
+		ManualCommitOffset: proto.Bool(false),
+	}
+	req := codec.NewRPCRequest()
+	req.RpcHeader = &protocol.RpcConnHeader{
+		Flag: proto.Int32(0),
+	}
+	data, err := proto.Marshal(reqC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetMarshalFailure, err.Error())
+	}
+	req.RequestHeader = &protocol.RequestHeader{
+		ServiceType: proto.Int32(ReadService),
+		ProtocolVer: proto.Int32(2),
+	}
+	req.RequestBody = &protocol.RequestBody{
+		Method:  proto.Int32(BrokerConsumerGetMsg),
+		Request: data,
+		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+	}
+	rsp, err := c.client.DoRequest(c.ctx, req)
+	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+		if v.ResponseException != nil {
+			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
+		}
+		rspC2B := &protocol.GetMessageResponseB2C{}
+		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+		if err != nil {
+			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+		}
+		return rspC2B, nil
+	}
+	return nil, errs.ErrAssertionFailure
+}
+
+// CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol.
+func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+	reqC2B := &protocol.CommitOffsetRequestC2B{
+		ClientId:         proto.String(sub.GetClientID()),
+		TopicName:        proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
+		PartitionId:      proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
+		GroupName:        proto.String(metadata.GetSubscribeInfo().GetGroup()),
+		LastPackConsumed: proto.Bool(metadata.GetSubscribeInfo().GetPartition().GetLastConsumed()),
+	}
+	req := codec.NewRPCRequest()
+	req.RpcHeader = &protocol.RpcConnHeader{
+		Flag: proto.Int32(10),
+	}
+	data, err := proto.Marshal(reqC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetMarshalFailure, err.Error())
+	}
+	req.RequestHeader = &protocol.RequestHeader{
+		ServiceType: proto.Int32(ReadService),
+		ProtocolVer: proto.Int32(2),
+	}
+	req.RequestBody = &protocol.RequestBody{
+		Method:  proto.Int32(BrokerConsumerHeartbeat),
+		Request: data,
+		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+	}
+	rsp, err := c.client.DoRequest(c.ctx, req)
+	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
+		if v.ResponseException != nil {
+			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
+		}
+		rspC2B := &protocol.CommitOffsetResponseB2C{}
+		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
+		if err != nil {
+			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+		}
+		return rspC2B, nil
+	}
+	return nil, errs.ErrAssertionFailure
+}
+
+// HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol.
+func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
+	reqC2B := &protocol.HeartBeatRequestC2B{
+		ClientId:      proto.String(sub.GetClientID()),
+		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
+		ReadStatus:    proto.Int32(metadata.GetReadStatus()),
+		QryPriorityId: proto.Int32(metadata.GetSubscribeInfo().GetQryPriorityID()),
+		AuthInfo:      sub.GetAuthorizedInfo(),
+	}
+	partitions := r.GetPartitionByBroker(metadata.GetNode())
+	reqC2B.PartitionInfo = make([]string, 0, len(partitions))
+	for _, partition := range partitions {
+		reqC2B.PartitionInfo = append(reqC2B.PartitionInfo, partition.String())
+	}
+	data, err := proto.Marshal(reqC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetMarshalFailure, err.Error())
+	}
+	req := codec.NewRPCRequest()
+	req.RequestHeader = &protocol.RequestHeader{
+		ServiceType: proto.Int32(ReadService),
+		ProtocolVer: proto.Int32(2),
+	}
+	req.RequestBody = &protocol.RequestBody{
+		Method:  proto.Int32(BrokerConsumerHeartbeat),
+		Request: data,
+		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
+	}
+	req.RpcHeader = &protocol.RpcConnHeader{
+		Flag: proto.Int32(0),
+	}
+	rsp, err := c.client.DoRequest(c.ctx, req)
+	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {

Review comment:
       Return to failure quickly.

##########
File path: tubemq-client-twins/tubemq-client-go/rpc/broker.go
##########
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+)
+
+const (
+	Register   = 31

Review comment:
       Need add annotation.

##########
File path: tubemq-client-twins/tubemq-client-go/rpc/broker.go
##########
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package rpc
+
+import (
+	"github.com/golang/protobuf/proto"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+)
+
+const (
+	Register   = 31
+	Unregister = 32
+)
+
+const (
+	BrokerProducerRegister = iota + 11

Review comment:
       Need add annotation.

##########
File path: tubemq-client-twins/tubemq-client-go/go.mod
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+// <p>
+// http://www.apache.org/licenses/LICENSE-2.0
+// <p>
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+module github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go
+
+go 1.14
+
+require (
+	github.com/golang/protobuf v1.4.3

Review comment:
       Protobuf lib is same, need keep one.
   github.com/golang/protobuf v1.4.3 need delete. 
   
   google.golang.org/protobuf v1.23.0   keep.
   
   

##########
File path: tubemq-client-twins/tubemq-client-go/rpc/client.go
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package rpc encapsulates all the rpc request to TubeMQ.
+package rpc
+
+import (
+	"context"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+)
+
+const (
+	ReadService = 2

Review comment:
       Need add annotation.

##########
File path: tubemq-client-twins/tubemq-client-go/rpc/client.go
##########
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package rpc encapsulates all the rpc request to TubeMQ.
+package rpc
+
+import (
+	"context"
+
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/transport"
+)
+
+const (
+	ReadService = 2
+	AdminService = 4
+)
+
+// RPCClient is the rpc level client to interact with TubeMQ.
+type RPCClient interface {
+	// RegisterRequestC2B is the rpc request for a consumer to register to a broker.
+	RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	// UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker.
+	UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	// GetMessageRequestC2B is the rpc request for a consumer to get message from a broker.
+	GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+	// CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker.
+	CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+	// HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker.
+	HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+	// RegisterRequestC2M is the rpc request for a consumer to register request to master.
+	RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+	// HeartRequestC2M is the rpc request for a consumer to send heartbeat to master.
+	HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+	// CloseRequestC2M is the rpc request for a consumer to be closed to master.
+	CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+}
+
+type rpcClient struct {
+	pool   *multiplexing.Pool
+	client *transport.Client
+	ctx    context.Context
+	config *config.Config
+}
+
+// New returns a default TubeMQ rpc Client
+func New(pool *multiplexing.Pool, opts *transport.Options, ctx context.Context, config *config.Config) RPCClient {
+	return &rpcClient{
+		pool:   pool,
+		client: transport.New(opts, pool),
+		ctx:    ctx,

Review comment:
       Why use global context? How to control the timeout for each request?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org