You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/05/24 09:32:43 UTC

[incubator-inlong] 03/03: Pass context from client

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch INLONG-25
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git

commit ddfab69aa7ed27c9b3b46c49338a87030349f245
Author: Zijie Lu <ws...@gmail.com>
AuthorDate: Mon May 24 16:34:56 2021 +0800

    Pass context from client
    
    Signed-off-by: Zijie Lu <ws...@gmail.com>
---
 .../tubemq-client-go/client/remote.go              |  16 +-
 tubemq-client-twins/tubemq-client-go/errs/errs.go  |  15 +-
 tubemq-client-twins/tubemq-client-go/rpc/broker.go | 181 ++++++++++-----------
 tubemq-client-twins/tubemq-client-go/rpc/client.go |  52 ++++--
 tubemq-client-twins/tubemq-client-go/rpc/master.go | 104 ++++++------
 5 files changed, 186 insertions(+), 182 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-go/client/remote.go b/tubemq-client-twins/tubemq-client-go/client/remote.go
index fd60425..e36b69f 100644
--- a/tubemq-client-twins/tubemq-client-go/client/remote.go
+++ b/tubemq-client-twins/tubemq-client-go/client/remote.go
@@ -27,14 +27,14 @@ import (
 
 // RmtDataCache represents the data returned from TubeMQ.
 type RmtDataCache struct {
-	consumerID       string
-	groupName        string
-	underGroupCtrl   bool
-	defFlowCtrlID    int64
-	groupFlowCtrlID  int64
-	subscribeInfo    []*metadata.SubscribeInfo
-	rebalanceResults []*metadata.ConsumerEvent
-	mu               sync.Mutex
+	consumerID         string
+	groupName          string
+	underGroupCtrl     bool
+	defFlowCtrlID      int64
+	groupFlowCtrlID    int64
+	subscribeInfo      []*metadata.SubscribeInfo
+	rebalanceResults   []*metadata.ConsumerEvent
+	mu                 sync.Mutex
 	brokerToPartitions map[*metadata.Node][]*metadata.Partition
 }
 
diff --git a/tubemq-client-twins/tubemq-client-go/errs/errs.go b/tubemq-client-twins/tubemq-client-go/errs/errs.go
index 637e381..0ef803d 100644
--- a/tubemq-client-twins/tubemq-client-go/errs/errs.go
+++ b/tubemq-client-twins/tubemq-client-go/errs/errs.go
@@ -23,12 +23,19 @@ import (
 )
 
 const (
-	RetMarshalFailure    = 1
+	// RetMarshalFailure represents the error code of marshal error.
+	RetMarshalFailure = 1
+	// RetResponseException represents the error code of response exception.
 	RetResponseException = 2
-	RetUnMarshalFailure  = 3
-	RetAssertionFailure  = 4
+	// RetUnMarshalFailure represents the error code of unmarshal error.
+	RetUnMarshalFailure = 3
+	// RetAssertionFailure represents the error code of assertion error.
+	RetAssertionFailure = 4
+	// RetRequestFailure represents the error code of request error.
+	RetRequestFailure = 5
 )
 
+// ErrAssertionFailure represents RetAssertionFailure error.
 var ErrAssertionFailure = New(RetAssertionFailure, "AssertionFailure")
 
 // Error provides a TubeMQ-specific error container
@@ -37,10 +44,12 @@ type Error struct {
 	Msg  string
 }
 
+// Error() implements the Error interface.
 func (e *Error) Error() string {
 	return fmt.Sprintf("code: %d, msg:%s", e.Code, e.Msg)
 }
 
+// New returns a self-defined error.
 func New(code int32, msg string) error {
 	err := &Error{
 		Code: code,
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/broker.go b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
index b2b614d..7f9ead2 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/broker.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/broker.go
@@ -30,26 +30,26 @@ import (
 )
 
 const (
-	Register   = 31
-	Unregister = 32
+	register   = 31
+	unregister = 32
 )
 
 const (
-	BrokerProducerRegister = iota + 11
-	BrokerProducerHeartbeat
-	BrokerProducerSendMsg
-	BrokerProducerClose
-	BrokerConsumerRegister
-	BrokerConsumerHeartbeat
-	BrokerConsumerGetMsg
-	BrokerConsumerCommit
-	BrokerConsumerClose
+	brokerProducerRegister = iota + 11
+	brokerProducerHeartbeat
+	brokerProducerSendMsg
+	brokerProducerClose
+	brokerConsumerRegister
+	brokerConsumerHeartbeat
+	brokerConsumerGetMsg
+	brokerConsumerCommit
+	brokerConsumerClose
 )
 
 // RegisterRequestC2B implements the RegisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
 	reqC2B := &protocol.RegisterRequestC2B{
-		OpType:        proto.Int32(Register),
+		OpType:        proto.Int32(register),
 		ClientId:      proto.String(sub.GetClientID()),
 		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		TopicName:     proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -78,35 +78,32 @@ func (c *rpcClient) RegisterRequestC2B(metadata *metadata.Metadata, sub *client.
 		Flag: proto.Int32(0),
 	}
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(ReadService),
+		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(BrokerConsumerRegister),
+		Method:  proto.Int32(brokerConsumerRegister),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
-		}
-		rspC2B := &protocol.RegisterResponseB2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspC2B, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
 	}
-	return nil, errs.ErrAssertionFailure
+
+	rspC2B := &protocol.RegisterResponseB2C{}
+	err = proto.Unmarshal(rspBody.Data, rspC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+	}
+	return rspC2B, nil
 }
 
 // UnregisterRequestC2B implements the UnregisterRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
+func (c *rpcClient) UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error) {
 	reqC2B := &protocol.RegisterRequestC2B{
-		OpType:      proto.Int32(Unregister),
+		OpType:      proto.Int32(unregister),
 		ClientId:    proto.String(sub.GetClientID()),
 		GroupName:   proto.String(metadata.GetSubscribeInfo().GetGroup()),
 		TopicName:   proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -123,33 +120,30 @@ func (c *rpcClient) UnregisterRequestC2B(metadata metadata.Metadata, sub *client
 		return nil, errs.New(errs.RetMarshalFailure, err.Error())
 	}
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(ReadService),
+		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(BrokerConsumerRegister),
+		Method:  proto.Int32(brokerConsumerRegister),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
-		}
-		rspC2B := &protocol.RegisterResponseB2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspC2B, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
+	}
+
+	rspC2B := &protocol.RegisterResponseB2C{}
+	err = proto.Unmarshal(rspBody.Data, rspC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
 	}
-	return nil, errs.ErrAssertionFailure
+	return rspC2B, nil
 }
 
 // GetMessageRequestC2B implements the GetMessageRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
+func (c *rpcClient) GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error) {
 	reqC2B := &protocol.GetMessageRequestC2B{
 		ClientId:           proto.String(sub.GetClientID()),
 		PartitionId:        proto.Int32(metadata.GetSubscribeInfo().GetPartition().GetPartitionID()),
@@ -168,33 +162,30 @@ func (c *rpcClient) GetMessageRequestC2B(metadata *metadata.Metadata, sub *clien
 		return nil, errs.New(errs.RetMarshalFailure, err.Error())
 	}
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(ReadService),
+		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(BrokerConsumerGetMsg),
+		Method:  proto.Int32(brokerConsumerGetMsg),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
-		}
-		rspC2B := &protocol.GetMessageResponseB2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspC2B, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
 	}
-	return nil, errs.ErrAssertionFailure
+
+	rspC2B := &protocol.GetMessageResponseB2C{}
+	err = proto.Unmarshal(rspBody.Data, rspC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+	}
+	return rspC2B, nil
 }
 
 // CommitOffsetRequestC2B implements the CommitOffsetRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
+func (c *rpcClient) CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error) {
 	reqC2B := &protocol.CommitOffsetRequestC2B{
 		ClientId:         proto.String(sub.GetClientID()),
 		TopicName:        proto.String(metadata.GetSubscribeInfo().GetPartition().GetTopic()),
@@ -211,33 +202,30 @@ func (c *rpcClient) CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *cli
 		return nil, errs.New(errs.RetMarshalFailure, err.Error())
 	}
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(ReadService),
+		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(BrokerConsumerHeartbeat),
+		Method:  proto.Int32(brokerConsumerHeartbeat),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
-		}
-		rspC2B := &protocol.CommitOffsetResponseB2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspC2B, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
 	}
-	return nil, errs.ErrAssertionFailure
+
+	rspC2B := &protocol.CommitOffsetResponseB2C{}
+	err = proto.Unmarshal(rspBody.Data, rspC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+	}
+	return rspC2B, nil
 }
 
 // HeartbeatRequestC2B implements the HeartbeatRequestC2B interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
+func (c *rpcClient) HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error) {
 	reqC2B := &protocol.HeartBeatRequestC2B{
 		ClientId:      proto.String(sub.GetClientID()),
 		GroupName:     proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -256,30 +244,27 @@ func (c *rpcClient) HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client
 	}
 	req := codec.NewRPCRequest()
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(ReadService),
+		ServiceType: proto.Int32(brokerReadService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(BrokerConsumerHeartbeat),
+		Method:  proto.Int32(brokerConsumerHeartbeat),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
-		}
-		rspC2B := &protocol.HeartBeatResponseB2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspC2B)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspC2B, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
+	}
+
+	rspC2B := &protocol.HeartBeatResponseB2C{}
+	err = proto.Unmarshal(rspBody.Data, rspC2B)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
 	}
-	return nil, errs.ErrAssertionFailure
+	return rspC2B, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/client.go b/tubemq-client-twins/tubemq-client-go/rpc/client.go
index 46961cd..71123c7 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/client.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/client.go
@@ -19,8 +19,12 @@
 package rpc
 
 import (
+	"context"
+
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/client"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/codec"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/config"
+	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/errs"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/metadata"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/multiplexing"
 	"github.com/apache/incubator-inlong/tubemq-client-twins/tubemq-client-go/protocol"
@@ -28,28 +32,36 @@ import (
 )
 
 const (
-	ReadService = 2
-	AdminService = 4
+	brokerReadService = 2
+	masterService     = 4
 )
 
 // RPCClient is the rpc level client to interact with TubeMQ.
 type RPCClient interface {
 	// RegisterRequestC2B is the rpc request for a consumer to register to a broker.
-	RegisterRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	RegisterRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
 	// UnregisterRequestC2B is the rpc request for a consumer to unregister to a broker.
-	UnregisterRequestC2B(metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
+	UnregisterRequestC2B(ctx context.Context, metadata metadata.Metadata, sub *client.SubInfo) (*protocol.RegisterResponseB2C, error)
 	// GetMessageRequestC2B is the rpc request for a consumer to get message from a broker.
-	GetMessageRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
+	GetMessageRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.GetMessageResponseB2C, error)
 	// CommitOffsetRequestC2B is the rpc request for a consumer to commit offset to a broker.
-	CommitOffsetRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
+	CommitOffsetRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CommitOffsetResponseB2C, error)
 	// HeartbeatRequestC2B is the rpc request for a consumer to send heartbeat to a broker.
-	HeartbeatRequestC2B(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
+	HeartbeatRequestC2B(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartBeatResponseB2C, error)
 	// RegisterRequestC2M is the rpc request for a consumer to register request to master.
-	RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
+	RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error)
 	// HeartRequestC2M is the rpc request for a consumer to send heartbeat to master.
-	HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
+	HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error)
 	// CloseRequestC2M is the rpc request for a consumer to be closed to master.
-	CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+	CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error)
+}
+
+// New returns a default TubeMQ rpc Client
+func New(pool *multiplexing.Pool, opts *transport.Options) RPCClient {
+	return &rpcClient{
+		pool:   pool,
+		client: transport.New(opts, pool),
+	}
 }
 
 type rpcClient struct {
@@ -58,11 +70,19 @@ type rpcClient struct {
 	config *config.Config
 }
 
-// New returns a default TubeMQ rpc Client
-func New(pool *multiplexing.Pool, opts *transport.Options, config *config.Config) RPCClient {
-	return &rpcClient{
-		pool:   pool,
-		client: transport.New(opts, pool),
-		config: config,
+func (c *rpcClient) doRequest(ctx context.Context, req codec.RPCRequest) (*protocol.RspResponseBody, error) {
+	rsp, err := c.client.DoRequest(ctx, req)
+	if err != nil {
+		return nil, errs.New(errs.RetRequestFailure, err.Error())
+	}
+
+	if _, ok := rsp.(*codec.TubeMQRPCResponse); !ok {
+		return nil, errs.ErrAssertionFailure
+	}
+
+	v := rsp.(*codec.TubeMQRPCResponse)
+	if v.ResponseException != nil {
+		return nil, errs.New(errs.RetResponseException, v.ResponseException.String())
 	}
+	return v.ResponseBody, nil
 }
diff --git a/tubemq-client-twins/tubemq-client-go/rpc/master.go b/tubemq-client-twins/tubemq-client-go/rpc/master.go
index 38e2b68..9eb336a 100644
--- a/tubemq-client-twins/tubemq-client-go/rpc/master.go
+++ b/tubemq-client-twins/tubemq-client-go/rpc/master.go
@@ -30,16 +30,16 @@ import (
 )
 
 const (
-	MasterProducerRegister = iota + 1
-	MasterProducerHeartbeat
-	MasterProducerClose
-	MasterConsumerRegister
-	MasterConsumerHeartbeat
-	MasterConsumerClose
+	masterProducerRegister = iota + 1
+	masterProducerHeartbeat
+	masterProducerClose
+	masterConsumerRegister
+	masterConsumerHeartbeat
+	masterConsumerClose
 )
 
 // RegisterRequestRequestC2M implements the RegisterRequestRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
+func (c *rpcClient) RegisterRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.RegisterResponseM2C, error) {
 	reqC2M := &protocol.RegisterRequestC2M{
 		ClientId:         proto.String(sub.GetClientID()),
 		HostName:         proto.String(metadata.GetNode().GetHost()),
@@ -83,34 +83,30 @@ func (c *rpcClient) RegisterRequestC2M(metadata *metadata.Metadata, sub *client.
 		Flag: proto.Int32(0),
 	}
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(AdminService),
+		ServiceType: proto.Int32(masterService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(MasterConsumerRegister),
+		Method:  proto.Int32(masterConsumerRegister),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, err.Error())
-		}
-		rspM2C := &protocol.RegisterResponseM2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspM2C, nil
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
+	}
+
+	rspM2C := &protocol.RegisterResponseM2C{}
+	err = proto.Unmarshal(rspBody.Data, rspM2C)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
 	}
-	return nil, errs.ErrAssertionFailure
+	return rspM2C, nil
 }
 
 // HeartRequestC2M implements the HeartRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
+func (c *rpcClient) HeartRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo, r *client.RmtDataCache) (*protocol.HeartResponseM2C, error) {
 	reqC2M := &protocol.HeartRequestC2M{
 		ClientId:            proto.String(sub.GetClientID()),
 		GroupName:           proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -151,36 +147,33 @@ func (c *rpcClient) HeartRequestC2M(metadata *metadata.Metadata, sub *client.Sub
 	}
 	req := codec.NewRPCRequest()
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(AdminService),
+		ServiceType: proto.Int32(masterService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(MasterConsumerHeartbeat),
+		Method:  proto.Int32(masterConsumerHeartbeat),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, err.Error())
-		}
-		rspM2C := &protocol.HeartResponseM2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspM2C, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
 	}
-	return nil, errs.ErrAssertionFailure
+
+	rspM2C := &protocol.HeartResponseM2C{}
+	err = proto.Unmarshal(rspBody.Data, rspM2C)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
+	}
+	return rspM2C, nil
 }
 
 // CloseRequestC2M implements the CloseRequestC2M interface according to TubeMQ RPC protocol.
-func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
+func (c *rpcClient) CloseRequestC2M(ctx context.Context, metadata *metadata.Metadata, sub *client.SubInfo) (*protocol.CloseResponseM2C, error) {
 	reqC2M := &protocol.CloseRequestC2M{
 		ClientId:  proto.String(sub.GetClientID()),
 		GroupName: proto.String(metadata.GetSubscribeInfo().GetGroup()),
@@ -192,30 +185,27 @@ func (c *rpcClient) CloseRequestC2M(metadata *metadata.Metadata, sub *client.Sub
 	}
 	req := codec.NewRPCRequest()
 	req.RequestHeader = &protocol.RequestHeader{
-		ServiceType: proto.Int32(AdminService),
+		ServiceType: proto.Int32(masterService),
 		ProtocolVer: proto.Int32(2),
 	}
 	req.RequestBody = &protocol.RequestBody{
-		Method:  proto.Int32(MasterConsumerClose),
+		Method:  proto.Int32(masterConsumerClose),
 		Request: data,
 		Timeout: proto.Int64(c.config.Net.ReadTimeout.Milliseconds()),
 	}
 	req.RpcHeader = &protocol.RpcConnHeader{
 		Flag: proto.Int32(0),
 	}
-	ctx, cancel := context.WithTimeout(context.Background(), c.config.Net.ReadTimeout)
-	defer cancel()
-	rsp, err := c.client.DoRequest(ctx, req)
-	if v, ok := rsp.(*codec.TubeMQRPCResponse); ok {
-		if v.ResponseException != nil {
-			return nil, errs.New(errs.RetResponseException, err.Error())
-		}
-		rspM2C := &protocol.CloseResponseM2C{}
-		err := proto.Unmarshal(v.ResponseBody.Data, rspM2C)
-		if err != nil {
-			return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
-		}
-		return rspM2C, nil
+
+	rspBody, err := c.doRequest(ctx, req)
+	if err != nil {
+		return nil, err
+	}
+
+	rspM2C := &protocol.CloseResponseM2C{}
+	err = proto.Unmarshal(rspBody.Data, rspM2C)
+	if err != nil {
+		return nil, errs.New(errs.RetUnMarshalFailure, err.Error())
 	}
-	return nil, errs.ErrAssertionFailure
+	return rspM2C, nil
 }