You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/26 02:51:58 UTC

[rocketmq-client-go] branch native updated: add ctx to rmqClient & remoteClient. resolve #176 (#177)

This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new a1b4211  add ctx to rmqClient & remoteClient. resolve #176 (#177)
a1b4211 is described below

commit a1b42115ec7856d66f3563bbae36512d8a1cecbc
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Aug 26 10:51:54 2019 +0800

    add ctx to rmqClient & remoteClient. resolve #176 (#177)
---
 consumer/consumer.go                  | 12 ++---
 consumer/offset_store.go              |  5 +-
 consumer/offset_store_test.go         |  2 +-
 consumer/push_consumer.go             |  2 +-
 internal/client.go                    | 24 ++++-----
 internal/mock_client.go               | 91 ++++++++---------------------------
 internal/remote/future.go             | 11 +++--
 internal/remote/mock_remote_client.go | 25 +++++-----
 internal/remote/remote_client.go      | 27 ++++++-----
 internal/remote/remote_client_test.go | 21 ++++----
 internal/route.go                     |  3 +-
 internal/route_test.go                |  5 +-
 internal/trace.go                     |  2 +-
 producer/producer.go                  | 10 ++--
 producer/producer_test.go             |  8 +--
 15 files changed, 102 insertions(+), 146 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 9bed213..b3454a6 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -546,7 +546,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []primitive.MessageQueue {
 	data, _ := json.Marshal(body)
 	request := remote.NewRemotingCommand(internal.ReqLockBatchMQ, nil, data)
-	response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
+	response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
 	if err != nil {
 		rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
 		return nil
@@ -566,12 +566,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
 	data, _ := json.Marshal(body)
 	request := remote.NewRemotingCommand(internal.ReqUnlockBatchMQ, nil, data)
 	if oneway {
-		err := dc.client.InvokeOneWay(addr, request, 3*time.Second)
+		err := dc.client.InvokeOneWay(context.Background(), addr, request, 3*time.Second)
 		if err != nil {
 			rlog.Errorf("lock mq to broker with oneway: %s error %s", addr, err.Error())
 		}
 	} else {
-		response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
+		response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
 		if err != nil {
 			rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
 		}
@@ -832,7 +832,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
 			ConsumerGroup: dc.consumerGroup,
 		}
 		cmd := remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
-		res, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
+		res, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
 		if err != nil {
 			rlog.Errorf("get consumer list of [%s] from %s error: %s", dc.consumerGroup, brokerAddr, err.Error())
 			return nil
@@ -869,7 +869,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
 	}
 
 	cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
-	response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+	response, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second)
 	if err != nil {
 		return -1, err
 	}
@@ -899,7 +899,7 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
 	}
 
 	cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
-	response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+	response, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second)
 	if err != nil {
 		return -1, err
 	}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 4d1b7e9..71d2c87 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -18,6 +18,7 @@ limitations under the License.
 package consumer
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"os"
@@ -302,7 +303,7 @@ func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq
 		QueueId:       mq.QueueId,
 	}
 	cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
-	res, err := r.client.InvokeSync(broker, cmd, 3*time.Second)
+	res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
 	if err != nil {
 		return -1, err
 	}
@@ -336,7 +337,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic strin
 		CommitOffset:  queue.Offset,
 	}
 	cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
-	return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
+	return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
 }
 
 func readFromMemory(table map[string]map[int]*queueOffset, mq *primitive.MessageQueue) int64 {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index aa81271..4f6e5ef 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -279,7 +279,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
 					"offset": "1",
 				},
 			}
-			rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
+			rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
 
 			remoteStore.persist(queues)
 			offset := remoteStore.read(mq, _ReadFromStore)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d5a5ffc..e8aabdc 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -515,7 +515,7 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.Messag
 	} else {
 		brokerAddr = msg.StoreHost
 	}
-	_, err := pc.client.InvokeSync(brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
+	_, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
 	if err != nil {
 		return false
 	}
diff --git a/internal/client.go b/internal/client.go
index 8822e2d..cb4cfb6 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -129,11 +129,11 @@ type RMQClient interface {
 	ClientID() string
 
 	RegisterProducer(group string, producer InnerProducer)
-	InvokeSync(addr string, request *remote.RemotingCommand,
+	InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
-	InvokeAsync(addr string, request *remote.RemotingCommand,
+	InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error
-	InvokeOneWay(addr string, request *remote.RemotingCommand,
+	InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) error
 	CheckClientInBroker()
 	SendHeartbeatToAllBrokerWithLock()
@@ -291,31 +291,31 @@ func (c *rmqClient) ClientID() string {
 	return id
 }
 
-func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
 	if c.close {
 		return nil, ErrServiceState
 	}
-	return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
+	return c.remoteClient.InvokeSync(ctx, addr, request, timeoutMillis)
 }
 
-func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
 	if c.close {
 		return ErrServiceState
 	}
-	return c.remoteClient.InvokeAsync(addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
+	return c.remoteClient.InvokeAsync(ctx, addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
 		f(future.ResponseCommand, future.Err)
 	})
 
 }
 
-func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration) error {
 	if c.close {
 		return ErrServiceState
 	}
-	return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
+	return c.remoteClient.InvokeOneWay(ctx, addr, request, timeoutMillis)
 }
 
 func (c *rmqClient) CheckClientInBroker() {
@@ -357,7 +357,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 		data := value.(*BrokerData)
 		for id, addr := range data.BrokerAddresses {
 			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
-			response, err := c.remoteClient.InvokeSync(addr, cmd, 3*time.Second)
+			response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
 			if err != nil {
 				rlog.Warnf("send heart beat to broker error: %s", err.Error())
 				return true
@@ -417,7 +417,7 @@ func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerNam
 func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*primitive.Message) (*primitive.SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
-	err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
+	err := c.remoteClient.InvokeOneWay(ctx, brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
 		rlog.Warnf("send messages with oneway error: %v", err)
 	}
@@ -473,7 +473,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 // PullMessage with sync
 func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-	res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 10*time.Second)
+	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 10*time.Second)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index af9cf2c..a216e93 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -21,13 +21,14 @@ limitations under the License.
 package internal
 
 import (
-	context "context"
-	reflect "reflect"
-	time "time"
+	"context"
+	"reflect"
+	"time"
 
-	remote "github.com/apache/rocketmq-client-go/internal/remote"
-	primitive "github.com/apache/rocketmq-client-go/primitive"
-	gomock "github.com/golang/mock/gomock"
+	"github.com/golang/mock/gomock"
+
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 )
 
 // MockInnerProducer is a mock of InnerProducer interface
@@ -55,7 +56,6 @@ func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder {
 
 // PublishTopicList mocks base method
 func (m *MockInnerProducer) PublishTopicList() []string {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "PublishTopicList")
 	ret0, _ := ret[0].([]string)
 	return ret0
@@ -63,25 +63,21 @@ func (m *MockInnerProducer) PublishTopicList() []string {
 
 // PublishTopicList indicates an expected call of PublishTopicList
 func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList))
 }
 
 // UpdateTopicPublishInfo mocks base method
 func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo) {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info)
 }
 
 // UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo
 func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicPublishInfo", reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo), topic, info)
 }
 
 // IsPublishTopicNeedUpdate mocks base method
 func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic)
 	ret0, _ := ret[0].(bool)
 	return ret0
@@ -89,13 +85,11 @@ func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
 
 // IsPublishTopicNeedUpdate indicates an expected call of IsPublishTopicNeedUpdate
 func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPublishTopicNeedUpdate", reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate), topic)
 }
 
 // IsUnitMode mocks base method
 func (m *MockInnerProducer) IsUnitMode() bool {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "IsUnitMode")
 	ret0, _ := ret[0].(bool)
 	return ret0
@@ -103,7 +97,6 @@ func (m *MockInnerProducer) IsUnitMode() bool {
 
 // IsUnitMode indicates an expected call of IsUnitMode
 func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode))
 }
 
@@ -132,7 +125,6 @@ func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder {
 
 // PersistConsumerOffset mocks base method
 func (m *MockInnerConsumer) PersistConsumerOffset() error {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "PersistConsumerOffset")
 	ret0, _ := ret[0].(error)
 	return ret0
@@ -140,25 +132,21 @@ func (m *MockInnerConsumer) PersistConsumerOffset() error {
 
 // PersistConsumerOffset indicates an expected call of PersistConsumerOffset
 func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConsumerOffset", reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset))
 }
 
 // UpdateTopicSubscribeInfo mocks base method
 func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs)
 }
 
 // UpdateTopicSubscribeInfo indicates an expected call of UpdateTopicSubscribeInfo
 func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicSubscribeInfo", reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo), topic, mqs)
 }
 
 // IsSubscribeTopicNeedUpdate mocks base method
 func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic)
 	ret0, _ := ret[0].(bool)
 	return ret0
@@ -166,13 +154,11 @@ func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
 
 // IsSubscribeTopicNeedUpdate indicates an expected call of IsSubscribeTopicNeedUpdate
 func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSubscribeTopicNeedUpdate", reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate), topic)
 }
 
 // SubscriptionDataList mocks base method
 func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "SubscriptionDataList")
 	ret0, _ := ret[0].([]*SubscriptionData)
 	return ret0
@@ -180,25 +166,21 @@ func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
 
 // SubscriptionDataList indicates an expected call of SubscriptionDataList
 func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscriptionDataList", reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList))
 }
 
 // Rebalance mocks base method
 func (m *MockInnerConsumer) Rebalance() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "Rebalance")
 }
 
 // Rebalance indicates an expected call of Rebalance
 func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance", reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance))
 }
 
 // IsUnitMode mocks base method
 func (m *MockInnerConsumer) IsUnitMode() bool {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "IsUnitMode")
 	ret0, _ := ret[0].(bool)
 	return ret0
@@ -206,7 +188,6 @@ func (m *MockInnerConsumer) IsUnitMode() bool {
 
 // IsUnitMode indicates an expected call of IsUnitMode
 func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
 }
 
@@ -235,31 +216,26 @@ func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder {
 
 // Start mocks base method
 func (m *MockRMQClient) Start() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "Start")
 }
 
 // Start indicates an expected call of Start
 func (mr *MockRMQClientMockRecorder) Start() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockRMQClient)(nil).Start))
 }
 
 // Shutdown mocks base method
 func (m *MockRMQClient) Shutdown() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "Shutdown")
 }
 
 // Shutdown indicates an expected call of Shutdown
 func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockRMQClient)(nil).Shutdown))
 }
 
 // ClientID mocks base method
 func (m *MockRMQClient) ClientID() string {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "ClientID")
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -267,104 +243,88 @@ func (m *MockRMQClient) ClientID() string {
 
 // ClientID indicates an expected call of ClientID
 func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID", reflect.TypeOf((*MockRMQClient)(nil).ClientID))
 }
 
 // RegisterProducer mocks base method
 func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "RegisterProducer", group, producer)
 }
 
 // RegisterProducer indicates an expected call of RegisterProducer
 func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), group, producer)
 }
 
 // InvokeSync mocks base method
-func (m *MockRMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
-	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis)
+func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+	ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
 	ret0, _ := ret[0].(*remote.RemotingCommand)
 	ret1, _ := ret[1].(error)
 	return ret0, ret1
 }
 
 // InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), addr, request, timeoutMillis)
+func (mr *MockRMQClientMockRecorder) InvokeSync(ctx, addr, request, timeoutMillis interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), ctx, addr, request, timeoutMillis)
 }
 
 // InvokeAsync mocks base method
-func (m *MockRMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
-	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f)
+func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+	ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeoutMillis, f)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis, f interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), addr, request, timeoutMillis, f)
+func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, timeoutMillis, f interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, timeoutMillis, f)
 }
 
 // InvokeOneWay mocks base method
-func (m *MockRMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error {
-	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis)
+func (m *MockRMQClient) InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error {
+	ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeoutMillis)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), addr, request, timeoutMillis)
+func (mr *MockRMQClientMockRecorder) InvokeOneWay(ctx, addr, request, timeoutMillis interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), ctx, addr, request, timeoutMillis)
 }
 
 // CheckClientInBroker mocks base method
 func (m *MockRMQClient) CheckClientInBroker() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "CheckClientInBroker")
 }
 
 // CheckClientInBroker indicates an expected call of CheckClientInBroker
 func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckClientInBroker", reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker))
 }
 
 // SendHeartbeatToAllBrokerWithLock mocks base method
 func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock")
 }
 
 // SendHeartbeatToAllBrokerWithLock indicates an expected call of SendHeartbeatToAllBrokerWithLock
 func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeartbeatToAllBrokerWithLock", reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock))
 }
 
 // UpdateTopicRouteInfo mocks base method
 func (m *MockRMQClient) UpdateTopicRouteInfo() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "UpdateTopicRouteInfo")
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
 func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo))
 }
 
 // ProcessSendResponse mocks base method
 func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error {
-	m.ctrl.T.Helper()
 	varargs := []interface{}{brokerName, cmd, resp}
 	for _, a := range msgs {
 		varargs = append(varargs, a)
@@ -376,14 +336,12 @@ func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.Remot
 
 // ProcessSendResponse indicates an expected call of ProcessSendResponse
 func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp interface{}, msgs ...interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	varargs := append([]interface{}{brokerName, cmd, resp}, msgs...)
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSendResponse", reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse), varargs...)
 }
 
 // RegisterConsumer mocks base method
 func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer)
 	ret0, _ := ret[0].(error)
 	return ret0
@@ -391,25 +349,21 @@ func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) e
 
 // RegisterConsumer indicates an expected call of RegisterConsumer
 func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer), group, consumer)
 }
 
 // UnregisterConsumer mocks base method
 func (m *MockRMQClient) UnregisterConsumer(group string) {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "UnregisterConsumer", group)
 }
 
 // UnregisterConsumer indicates an expected call of UnregisterConsumer
 func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer), group)
 }
 
 // PullMessage mocks base method
 func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
 	ret0, _ := ret[0].(*primitive.PullResult)
 	ret1, _ := ret[1].(error)
@@ -418,13 +372,11 @@ func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, req
 
 // PullMessage indicates an expected call of PullMessage
 func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
 }
 
 // PullMessageAsync mocks base method
 func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
-	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
 	ret0, _ := ret[0].(error)
 	return ret0
@@ -432,30 +384,25 @@ func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string
 
 // PullMessageAsync indicates an expected call of PullMessageAsync
 func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), ctx, brokerAddrs, request, f)
 }
 
 // RebalanceImmediately mocks base method
 func (m *MockRMQClient) RebalanceImmediately() {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "RebalanceImmediately")
 }
 
 // RebalanceImmediately indicates an expected call of RebalanceImmediately
 func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceImmediately", reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately))
 }
 
 // UpdatePublishInfo mocks base method
 func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
-	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
 }
 
 // UpdatePublishInfo indicates an expected call of UpdatePublishInfo
 func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data interface{}) *gomock.Call {
-	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data)
 }
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 8690644..4d3008f 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -18,6 +18,7 @@ limitations under the License.
 package remote
 
 import (
+	"context"
 	"sync"
 	"time"
 
@@ -35,16 +36,18 @@ type ResponseFuture struct {
 	BeginTimestamp  time.Duration
 	Done            chan bool
 	callbackOnce    sync.Once
+	ctx             context.Context
 }
 
 // NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
+func NewResponseFuture(ctx context.Context, opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
 	return &ResponseFuture{
 		Opaque:         opaque,
 		Done:           make(chan bool),
 		Timeout:        timeout,
 		callback:       callback,
 		BeginTimestamp: time.Duration(time.Now().Unix()) * time.Second,
+		ctx:            ctx,
 	}
 }
 
@@ -66,19 +69,19 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
 		cmd *RemotingCommand
 		err error
 	)
-	timer := time.NewTimer(r.Timeout)
+	ctx, cancel := context.WithTimeout(r.ctx, r.Timeout)
+	defer cancel()
 	for {
 		select {
 		case <-r.Done:
 			cmd, err = r.ResponseCommand, r.Err
 			goto done
-		case <-timer.C:
+		case <-ctx.Done():
 			err = utils.ErrRequestTimeout
 			r.Err = err
 			goto done
 		}
 	}
 done:
-	timer.Stop()
 	return cmd, err
 }
diff --git a/internal/remote/mock_remote_client.go b/internal/remote/mock_remote_client.go
index 62a91c5..e264ab2 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -21,6 +21,7 @@
 package remote
 
 import (
+	context "context"
 	primitive "github.com/apache/rocketmq-client-go/primitive"
 	gomock "github.com/golang/mock/gomock"
 	reflect "reflect"
@@ -75,40 +76,40 @@ func (mr *MockRemotingClientMockRecorder) RegisterInterceptor(interceptors ...in
 }
 
 // InvokeSync mocks base method
-func (m *MockRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
-	ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeout)
+func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+	ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeout)
 	ret0, _ := ret[0].(*RemotingCommand)
 	ret1, _ := ret[1].(error)
 	return ret0, ret1
 }
 
 // InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRemotingClientMockRecorder) InvokeSync(addr, request, timeout interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request, timeout interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request, timeout)
 }
 
 // InvokeAsync mocks base method
-func (m *MockRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
-	ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeout, callback)
+func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+	ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeout, callback)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRemotingClientMockRecorder) InvokeAsync(addr, request, timeout, callback interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), addr, request, timeout, callback)
+func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, timeout, callback interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, timeout, callback)
 }
 
 // InvokeOneWay mocks base method
-func (m *MockRemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
-	ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeout)
+func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
+	ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeout)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRemotingClientMockRecorder) InvokeOneWay(addr, request, timeout interface{}) *gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request, timeout interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request, timeout)
 }
 
 // ShutDown mocks base method
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 01688fe..96f67a6 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -41,9 +41,9 @@ type TcpOption struct {
 type RemotingClient interface {
 	RegisterRequestFunc(code int16, f ClientRequestFunc)
 	RegisterInterceptor(interceptors ...primitive.Interceptor)
-	InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
-	InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
-	InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error
+	InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
+	InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
+	InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error
 	ShutDown()
 }
 
@@ -69,12 +69,12 @@ func (c *remotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) {
 }
 
 // TODO: merge sync and async model. sync should run on async model by blocking on chan
-func (c *remotingClient) InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
-	conn, err := c.connect(addr)
+func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+	conn, err := c.connect(ctx, addr)
 	if err != nil {
 		return nil, err
 	}
-	resp := NewResponseFuture(request.Opaque, timeout, nil)
+	resp := NewResponseFuture(ctx, request.Opaque, timeout, nil)
 	c.responseTable.Store(resp.Opaque, resp)
 	defer c.responseTable.Delete(request.Opaque)
 	err = c.sendRequest(conn, request)
@@ -86,12 +86,12 @@ func (c *remotingClient) InvokeSync(addr string, request *RemotingCommand, timeo
 }
 
 // InvokeAsync send request without blocking, just return immediately.
-func (c *remotingClient) InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
-	conn, err := c.connect(addr)
+func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+	conn, err := c.connect(ctx, addr)
 	if err != nil {
 		return err
 	}
-	resp := NewResponseFuture(request.Opaque, timeout, callback)
+	resp := NewResponseFuture(ctx, request.Opaque, timeout, callback)
 	c.responseTable.Store(resp.Opaque, resp)
 	err = c.sendRequest(conn, request)
 	if err != nil {
@@ -109,15 +109,15 @@ func (c *remotingClient) receiveAsync(f *ResponseFuture) {
 	}
 }
 
-func (c *remotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
-	conn, err := c.connect(addr)
+func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
+	conn, err := c.connect(ctx, addr)
 	if err != nil {
 		return err
 	}
 	return c.sendRequest(conn, request)
 }
 
-func (c *remotingClient) connect(addr string) (net.Conn, error) {
+func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, error) {
 	//it needs additional locker.
 	c.connectionLocker.Lock()
 	defer c.connectionLocker.Unlock()
@@ -125,7 +125,8 @@ func (c *remotingClient) connect(addr string) (net.Conn, error) {
 	if ok {
 		return conn.(net.Conn), nil
 	}
-	tcpConn, err := net.Dial("tcp", addr)
+	var d net.Dialer
+	tcpConn, err := d.DialContext(ctx, "tcp", addr)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 9cbb117..1d2c033 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -18,6 +18,7 @@ package remote
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"math/rand"
 	"net"
@@ -32,7 +33,7 @@ import (
 )
 
 func TestNewResponseFuture(t *testing.T) {
-	future := NewResponseFuture(10, time.Duration(1000), nil)
+	future := NewResponseFuture(context.Background(), 10, time.Duration(1000), nil)
 	if future.Opaque != 10 {
 		t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, future.Opaque)
 	}
@@ -62,7 +63,7 @@ func TestResponseFutureTimeout(t *testing.T) {
 			r.ResponseCommand.Remark = r.ResponseCommand.Remark + "Go Client"
 		}
 	}
-	future := NewResponseFuture(10, time.Duration(1000), callback)
+	future := NewResponseFuture(context.Background(), 10, time.Duration(1000), callback)
 	future.ResponseCommand = NewRemotingCommand(200,
 		nil, nil)
 
@@ -83,7 +84,7 @@ func TestResponseFutureTimeout(t *testing.T) {
 }
 
 func TestResponseFutureIsTimeout(t *testing.T) {
-	future := NewResponseFuture(10, 500*time.Millisecond, nil)
+	future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
 	if future.isTimeout() != false {
 		t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
 	}
@@ -92,12 +93,12 @@ func TestResponseFutureIsTimeout(t *testing.T) {
 }
 
 func TestResponseFutureWaitResponse(t *testing.T) {
-	future := NewResponseFuture(10, 500*time.Millisecond, nil)
+	future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
 	if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
 			utils.ErrRequestTimeout, err)
 	}
-	future = NewResponseFuture(10, 500*time.Millisecond, nil)
+	future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
 	responseError := errors.New("response error")
 	go func() {
 		time.Sleep(100 * time.Millisecond)
@@ -108,7 +109,7 @@ func TestResponseFutureWaitResponse(t *testing.T) {
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
 			responseError, err)
 	}
-	future = NewResponseFuture(10, 500*time.Millisecond, nil)
+	future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
 	responseRemotingCommand := NewRemotingCommand(202, nil, nil)
 	go func() {
 		time.Sleep(100 * time.Millisecond)
@@ -173,7 +174,7 @@ func TestInvokeSync(t *testing.T) {
 
 	go func() {
 		clientSend.Wait()
-		receiveCommand, err := client.InvokeSync(addr,
+		receiveCommand, err := client.InvokeSync(context.Background(), addr,
 			clientSendRemtingCommand, time.Second)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
@@ -237,7 +238,7 @@ func TestInvokeAsync(t *testing.T) {
 			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 			t.Logf("[Send: %d] asychronous message", index)
 			sendRemotingCommand := randomNewRemotingCommand()
-			err := client.InvokeAsync(addr, sendRemotingCommand, time.Second, func(r *ResponseFuture) {
+			err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, time.Second, func(r *ResponseFuture) {
 				t.Logf("[Receive: %d] asychronous message response", index)
 				if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) {
 					t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -303,7 +304,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
 	clientSend.Add(1)
 	go func() {
 		clientSend.Wait()
-		err := client.InvokeAsync(addr, clientSendRemtingCommand,
+		err := client.InvokeAsync(context.Background(), addr, clientSendRemtingCommand,
 			time.Duration(1000), func(r *ResponseFuture) {
 				assert.NotNil(t, r.Err)
 				assert.Equal(t, utils.ErrRequestTimeout, r.Err)
@@ -348,7 +349,7 @@ func TestInvokeOneWay(t *testing.T) {
 	clientSend.Add(1)
 	go func() {
 		clientSend.Wait()
-		err := client.InvokeOneWay(addr, clientSendRemtingCommand, 3*time.Second)
+		err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand, 3*time.Second)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		}
diff --git a/internal/route.go b/internal/route.go
index c6b7e70..3611bb9 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -18,6 +18,7 @@ limitations under the License.
 package internal
 
 import (
+	"context"
 	"encoding/json"
 	"errors"
 	"math/rand"
@@ -304,7 +305,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
 	)
 	for i := 0; i < s.Size(); i++ {
 		rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-		response, err = s.nameSrvClient.InvokeSync(s.getNameServerAddress(), rc, requestTimeout)
+		response, err = s.nameSrvClient.InvokeSync(context.Background(), s.getNameServerAddress(), rc, requestTimeout)
 
 		if err == nil {
 			break
diff --git a/internal/route_test.go b/internal/route_test.go
index 77f79c0..aa944b0 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -18,6 +18,7 @@ limitations under the License.
 package internal
 
 import (
+	"context"
 	"testing"
 	"time"
 
@@ -48,8 +49,8 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) {
 		Convey("When marshal producer trace data", func() {
 
 			count := 0
-			remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
-				func(addr string, request *remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, error) {
+			remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+				func(ctx context.Context, addr string, request *remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, error) {
 					count++
 					if count < 3 {
 						return nil, errors.New("not existed")
diff --git a/internal/trace.go b/internal/trace.go
index ca60eea..8bc6748 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -397,7 +397,7 @@ func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, dat
 	}
 
 	var req = td.buildSendRequest(mq, msg)
-	td.cli.InvokeAsync(addr, req, 5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
+	td.cli.InvokeAsync(context.Background(), addr, req, 5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
 		if e != nil {
 			rlog.Error("send trace data ,the traceData is %v", data)
 		}
diff --git a/producer/producer.go b/producer/producer.go
index 0b0367f..25e3210 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -166,7 +166,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 			producerCtx.MQ = *mq
 		}
 
-		res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, msg), 3*time.Second)
+		res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
 		if _err != nil {
 			err = _err
 			continue
@@ -205,7 +205,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
 		return errors.Errorf("topic=%s route info not found", mq.Topic)
 	}
 
-	return p.client.InvokeAsync(addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command *remote.RemotingCommand, err error) {
+	return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command *remote.RemotingCommand, err error) {
 		resp := new(primitive.SendResult)
 		if err != nil {
 			h(ctx, nil, err)
@@ -251,7 +251,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
 			return fmt.Errorf("topic=%s route info not found", mq.Topic)
 		}
 
-		_err := p.client.InvokeOneWay(addr, p.buildSendRequest(mq, msg), 3*time.Second)
+		_err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
 		if _err != nil {
 			err = _err
 			continue
@@ -398,7 +398,7 @@ func (tp *transactionProducer) checkTransactionState() {
 			req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
 			req.Remark = tp.errRemark(nil)
 
-			tp.producer.client.InvokeOneWay(callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
+			tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
 		default:
 			rlog.Error("unknow type %v", ch)
 		}
@@ -467,7 +467,7 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e
 	req := remote.NewRemotingCommand(internal.ReqENDTransaction, requestHeader, nil)
 	req.Remark = tp.errRemark(err)
 
-	return tp.producer.client.InvokeOneWay(brokerAddr, req, tp.producer.options.SendMsgTimeout)
+	return tp.producer.client.InvokeOneWay(context.Background(), brokerAddr, req, tp.producer.options.SendMsgTimeout)
 }
 
 func (tp *transactionProducer) errRemark(err error) string {
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 849978a..d4c58c2 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -130,7 +130,7 @@ func TestSync(t *testing.T) {
 
 	mockB4Send(p)
 
-	client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
+	client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
 	client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
 		func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
 			resp.Status = expectedResp.Status
@@ -181,8 +181,8 @@ func TestASync(t *testing.T) {
 
 	mockB4Send(p)
 
-	client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
-		func(addr string, request *remote.RemotingCommand,
+	client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+		func(ctx context.Context, addr string, request *remote.RemotingCommand,
 			timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
 			// mock invoke callback
 			f(nil, nil)
@@ -226,7 +226,7 @@ func TestOneway(t *testing.T) {
 
 	mockB4Send(p)
 
-	client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+	client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
 
 	err = p.SendOneWay(ctx, msg)
 	assert.Nil(t, err)