You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/24 09:32:43 UTC
[incubator-inlong] 03/03: Pass context from client
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit ddfab69aa7ed27c9b3b46c49338a87030349f245
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon May 24 16:34:56 2021 +0800
Pass context from client
Signed-off-by: Zijie Lu <ws...@gmail.com>
---
.../tubemq-client-go/client/remote.go | 16 +-
tubemq-client-twins/tubemq-client-go/errs/errs.go | 15 +-
tubemq-client-twins/tubemq-client-go/rpc/broker.go | 181 ++++++++++-----------
tubemq-client-twins/tubemq-client-go/rpc/client.go | 52 ++++--
tubemq-client-twins/tubemq-client-go/rpc/master.go | 104 ++++++------
5 files changed, 186 insertions(+), 182 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go
index fd60425..e36b69f 100644
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/client/remote.go
@@ -27,14 +27,14 @@ import (
// RmtDataCache represents the data returned from TubeMQ.
type RmtDataCache struct {
- consumerID string
- groupName string
- underGroupCtrl bool
- defFlowCtrlID int64
- groupFlowCtrlID int64
- subscribeInfo []*metadata.SubscribeInfo
- rebalanceResults []*metadata.ConsumerEvent
- mu sync.Mutex
+ consumerID string
+ groupName string
+ underGroupCtrl bool
+ defFlowCtrlID int64
+ groupFlowCtrlID int64
+ subscribeInfo []*metadata.SubscribeInfo
+ rebalanceResults []*metadata.ConsumerEvent
+ mu sync.Mutex
brokerToPartitions map[*metadata.Node][]*metadata.Partition
}
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 637e381..0ef803d 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -23,12 +23,19 @@ import (
)
const (
- RetMarshalFailure = 1
+ // RetMarshalFailure represents the error code of marshal error.
+ RetMarshalFailure = 1
+ // RetResponseException represents the error code of response exception.
RetResponseException = 2
- RetUnMarshalFailure = 3
- RetAssertionFailure = 4
+ // RetUnMarshalFailure represents the error code of unmarshal error.
+ RetUnMarshalFailure = 3
+ // RetAssertionFailure represents the error code of assertion error.
+ RetAssertionFailure = 4
+ // RetRequestFailure represents the error code of request error.
+ RetRequestFailure = 5
)
+// ErrAssertionFailure represents RetAssertionFailure error.
var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
// Error provides a TubeMQ-specific error container
@@ -37,10 +44,12 @@ type Error struct {
Msg string
}
+// Error() implements the Error interface.
func (e *Error) Error() string {
return fmt.Sprintf("code: %d, msg:%s", e.Code, e.Msg)
}
+// New returns a self-defined error.
func New(code int32, msg string) error {
err := &Error{
Code: code,
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index b2b614d..7f9ead2 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -30,26 +30,26 @@ import (
)
const (
- Register = 31
- Unregister = 32
+ register = 31
+ unregister = 32
)
const (
- BrokerProducerRegister = iota + 11
- BrokerProducerHeartbeat
- BrokerProducerSendMsg
- BrokerProducerClose
- BrokerConsumerRegister
- BrokerConsumerHeartbeat
- BrokerConsumerGetMsg
- BrokerConsumerCommit
- BrokerConsumerClose
+ brokerProducerRegister = iota + 11
+ brokerProducerHeartbeat
+ brokerProducerSendMsg
+ brokerProducerClose
+ brokerConsumerRegister
+ brokerConsumerHeartbeat
+ brokerConsumerGetMsg
+ brokerConsumerCommit
+ brokerConsumerClose
)
// RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
reqC2B := &protocol.RegisterRequestC2B{
- OpType: proto.Int32(Register),
+ OpType: proto.Int32(register),
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -78,35 +78,32 @@ func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.
Flag: proto.Int32(0),
}
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(ReadService),
+ ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(BrokerConsumerRegister),
+ Method: proto.Int32(brokerConsumerRegister),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
- }
- rspC2B := &protocol.RegisterResponseB2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspC2B, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
}
- return nil, errs.ErrAssertionFailure
+
+ rspC2B := &protocol.RegisterResponseB2C{}
+ err = proto.Unmarshal(rspBody.Data, rspC2B)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+ }
+ return rspC2B, nil
}
// UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
reqC2B := &protocol.RegisterRequestC2B{
- OpType: proto.Int32(Unregister),
+ OpType: proto.Int32(unregister),
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -123,33 +120,30 @@ func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client
return nil, errs.New(errs.RetMarshalFailure, err.Error())
}
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(ReadService),
+ ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(BrokerConsumerRegister),
+ Method: proto.Int32(brokerConsumerRegister),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
- }
- rspC2B := &protocol.RegisterResponseB2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspC2B, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ rspC2B := &protocol.RegisterResponseB2C{}
+ err = proto.Unmarshal(rspBody.Data, rspC2B)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
}
- return nil, errs.ErrAssertionFailure
+ return rspC2B, nil
}
// GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
reqC2B := &protocol.GetMessageRequestC2B{
ClientId: proto.String(sub.GetClientID()),
PartitionId: proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -168,33 +162,30 @@ func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *clien
return nil, errs.New(errs.RetMarshalFailure, err.Error())
}
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(ReadService),
+ ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(BrokerConsumerGetMsg),
+ Method: proto.Int32(brokerConsumerGetMsg),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
- }
- rspC2B := &protocol.GetMessageResponseB2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspC2B, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
}
- return nil, errs.ErrAssertionFailure
+
+ rspC2B := &protocol.GetMessageResponseB2C{}
+ err = proto.Unmarshal(rspBody.Data, rspC2B)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+ }
+ return rspC2B, nil
}
// CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
reqC2B := &protocol.CommitOffsetRequestC2B{
ClientId: proto.String(sub.GetClientID()),
TopicName: proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -211,33 +202,30 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *cli
return nil, errs.New(errs.RetMarshalFailure, err.Error())
}
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(ReadService),
+ ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(BrokerConsumerHeartbeat),
+ Method: proto.Int32(brokerConsumerHeartbeat),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
- }
- rspC2B := &protocol.CommitOffsetResponseB2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspC2B, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
}
- return nil, errs.ErrAssertionFailure
+
+ rspC2B := &protocol.CommitOffsetResponseB2C{}
+ err = proto.Unmarshal(rspBody.Data, rspC2B)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+ }
+ return rspC2B, nil
}
// HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
reqC2B := &protocol.HeartBeatRequestC2B{
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -256,30 +244,27 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client
}
req := codec.NewRPCRequest()
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(ReadService),
+ ServiceType: proto.Int32(brokerReadService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(BrokerConsumerHeartbeat),
+ Method: proto.Int32(brokerConsumerHeartbeat),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
- }
- rspC2B := &protocol.HeartBeatResponseB2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspC2B, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ rspC2B := &protocol.HeartBeatResponseB2C{}
+ err = proto.Unmarshal(rspBody.Data, rspC2B)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
}
- return nil, errs.ErrAssertionFailure
+ return rspC2B, nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 46961cd..71123c7 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -19,8 +19,12 @@
package rpc
import (
+ "context"
+
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+ "github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
@@ -28,28 +32,36 @@ import (
)
const (
- ReadService = 2
- AdminService = 4
+ brokerReadService = 2
+ masterService = 4
)
// RPCClient is the rpc level client to interact with TubeMQ.
type RPCClient interface {
// RegisterRequestC2B is the rpc request for a consumer to register to a broker.
- RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+ RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
// UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker.
- UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+ UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
// GetMessageRequestC2B is the rpc request for a consumer to get message from a broker.
- GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+ GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
// CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker.
- CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+ CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
// HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker.
- HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+ HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
// RegisterRequestC2M is the rpc request for a consumer to register request to master.
- RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+ RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
// HeartRequestC2M is the rpc request for a consumer to send heartbeat to master.
- HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+ HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
// CloseRequestC2M is the rpc request for a consumer to be closed to master.
- CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+ CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+}
+
+// New returns a default TubeMQ rpc Client
+func New(pool *multiplexing.Pool, opts *transport.Options) RPCClient {
+ return &rpcClient{
+ pool: pool,
+ client: transport.New(opts, pool),
+ }
}
type rpcClient struct {
@@ -58,11 +70,19 @@ type rpcClient struct {
config *config.Config
}
-// New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient {
- return &rpcClient{
- pool: pool,
- client: transport.New(opts, pool),
- config: config,
+func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
+ rsp, err := c.client.DoRequest(ctx, req)
+ if err != nil {
+ return nil, errs.New(errs.RetRequestFailure, err.Error())
+ }
+
+ if _, ok := rsp.(*codec.TubeMQRPCResponse); !ok {
+ return nil, errs.ErrAssertionFailure
+ }
+
+ v := rsp.(*codec.TubeMQRPCResponse)
+ if v.ResponseException != nil {
+ return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
}
+ return v.ResponseBody, nil
}
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 38e2b68..9eb336a 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -30,16 +30,16 @@ import (
)
const (
- MasterProducerRegister = iota + 1
- MasterProducerHeartbeat
- MasterProducerClose
- MasterConsumerRegister
- MasterConsumerHeartbeat
- MasterConsumerClose
+ masterProducerRegister = iota + 1
+ masterProducerHeartbeat
+ masterProducerClose
+ masterConsumerRegister
+ masterConsumerHeartbeat
+ masterConsumerClose
)
// RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
reqC2M := &protocol.RegisterRequestC2M{
ClientId: proto.String(sub.GetClientID()),
HostName: proto.String(metadata.GetNode().GetHost()),
@@ -83,34 +83,30 @@ func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.
Flag: proto.Int32(0),
}
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(AdminService),
+ ServiceType: proto.Int32(masterService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(MasterConsumerRegister),
+ Method: proto.Int32(masterConsumerRegister),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, err.Error())
- }
- rspM2C := &protocol.RegisterResponseM2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspM2C, nil
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ rspM2C := &protocol.RegisterResponseM2C{}
+ err = proto.Unmarshal(rspBody.Data, rspM2C)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
}
- return nil, errs.ErrAssertionFailure
+ return rspM2C, nil
}
// HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
reqC2M := &protocol.HeartRequestC2M{
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -151,36 +147,33 @@ func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.Sub
}
req := codec.NewRPCRequest()
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(AdminService),
+ ServiceType: proto.Int32(masterService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(MasterConsumerHeartbeat),
+ Method: proto.Int32(masterConsumerHeartbeat),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, err.Error())
- }
- rspM2C := &protocol.HeartResponseM2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspM2C, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
}
- return nil, errs.ErrAssertionFailure
+
+ rspM2C := &protocol.HeartResponseM2C{}
+ err = proto.Unmarshal(rspBody.Data, rspM2C)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+ }
+ return rspM2C, nil
}
// CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
reqC2M := &protocol.CloseRequestC2M{
ClientId: proto.String(sub.GetClientID()),
GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -192,30 +185,27 @@ func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.Sub
}
req := codec.NewRPCRequest()
req.RequestHeader = &protocol.RequestHeader{
- ServiceType: proto.Int32(AdminService),
+ ServiceType: proto.Int32(masterService),
ProtocolVer: proto.Int32(2),
}
req.RequestBody = &protocol.RequestBody{
- Method: proto.Int32(MasterConsumerClose),
+ Method: proto.Int32(masterConsumerClose),
Request: data,
Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
}
req.RpcHeader = &protocol.RpcConnHeader{
Flag: proto.Int32(0),
}
- ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
- defer cancel()
- rsp, err := c.client.DoRequest(ctx, req)
- if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
- if v.ResponseException != nil {
- return nil, errs.New(errs.RetResponseException, err.Error())
- }
- rspM2C := &protocol.CloseResponseM2C{}
- err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
- if err != nil {
- return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
- }
- return rspM2C, nil
+
+ rspBody, err := c.doRequest(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ rspM2C := &protocol.CloseResponseM2C{}
+ err = proto.Unmarshal(rspBody.Data, rspM2C)
+ if err != nil {
+ return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
}
- return nil, errs.ErrAssertionFailure
+ return rspM2C, nil
}