You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by hu...@apache.org on 2019/08/26 02:51:58 UTC
[rocketmq-client-go] branch native updated: add ctx to rmqClient &
remoteClient. resolve #176 (#177)
This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new a1b4211 add ctx to rmqClient & remoteClient. resolve #176 (#177)
a1b4211 is described below
commit a1b42115ec7856d66f3563bbae36512d8a1cecbc
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Mon Aug 26 10:51:54 2019 +0800
add ctx to rmqClient & remoteClient. resolve #176 (#177)
---
consumer/consumer.go | 12 ++---
consumer/offset_store.go | 5 +-
consumer/offset_store_test.go | 2 +-
consumer/push_consumer.go | 2 +-
internal/client.go | 24 ++++-----
internal/mock_client.go | 91 ++++++++---------------------------
internal/remote/future.go | 11 +++--
internal/remote/mock_remote_client.go | 25 +++++-----
internal/remote/remote_client.go | 27 ++++++-----
internal/remote/remote_client_test.go | 21 ++++----
internal/route.go | 3 +-
internal/route_test.go | 5 +-
internal/trace.go | 2 +-
producer/producer.go | 10 ++--
producer/producer_test.go | 8 +--
15 files changed, 102 insertions(+), 146 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 9bed213..b3454a6 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -546,7 +546,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []primitive.MessageQueue {
data, _ := json.Marshal(body)
request := remote.NewRemotingCommand(internal.ReqLockBatchMQ, nil, data)
- response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
+ response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
return nil
@@ -566,12 +566,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
data, _ := json.Marshal(body)
request := remote.NewRemotingCommand(internal.ReqUnlockBatchMQ, nil, data)
if oneway {
- err := dc.client.InvokeOneWay(addr, request, 3*time.Second)
+ err := dc.client.InvokeOneWay(context.Background(), addr, request, 3*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker with oneway: %s error %s", addr, err.Error())
}
} else {
- response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
+ response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
if err != nil {
rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
}
@@ -832,7 +832,7 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
ConsumerGroup: dc.consumerGroup,
}
cmd := remote.NewRemotingCommand(internal.ReqGetConsumerListByGroup, req, nil)
- res, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
+ res, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
if err != nil {
rlog.Errorf("get consumer list of [%s] from %s error: %s", dc.consumerGroup, brokerAddr, err.Error())
return nil
@@ -869,7 +869,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, er
}
cmd := remote.NewRemotingCommand(internal.ReqGetMaxOffset, request, nil)
- response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+ response, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second)
if err != nil {
return -1, err
}
@@ -899,7 +899,7 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, t
}
cmd := remote.NewRemotingCommand(internal.ReqSearchOffsetByTimestamp, request, nil)
- response, err := dc.client.InvokeSync(brokerAddr, cmd, 3*time.Second)
+ response, err := dc.client.InvokeSync(context.Background(), brokerAddr, cmd, 3*time.Second)
if err != nil {
return -1, err
}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 4d1b7e9..71d2c87 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -18,6 +18,7 @@ limitations under the License.
package consumer
import (
+ "context"
"encoding/json"
"fmt"
"os"
@@ -302,7 +303,7 @@ func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq
QueueId: mq.QueueId,
}
cmd := remote.NewRemotingCommand(internal.ReqQueryConsumerOffset, queryOffsetRequest, nil)
- res, err := r.client.InvokeSync(broker, cmd, 3*time.Second)
+ res, err := r.client.InvokeSync(context.Background(), broker, cmd, 3*time.Second)
if err != nil {
return -1, err
}
@@ -336,7 +337,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic strin
CommitOffset: queue.Offset,
}
cmd := remote.NewRemotingCommand(internal.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
- return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
+ return r.client.InvokeOneWay(context.Background(), broker, cmd, 5*time.Second)
}
func readFromMemory(table map[string]map[int]*queueOffset, mq *primitive.MessageQueue) int64 {
diff --git a/consumer/offset_store_test.go b/consumer/offset_store_test.go
index aa81271..4f6e5ef 100644
--- a/consumer/offset_store_test.go
+++ b/consumer/offset_store_test.go
@@ -279,7 +279,7 @@ func TestRemoteBrokerOffsetStore(t *testing.T) {
"offset": "1",
},
}
- rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
+ rmqClient.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(ret, nil)
remoteStore.persist(queues)
offset := remoteStore.read(mq, _ReadFromStore)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d5a5ffc..e8aabdc 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -515,7 +515,7 @@ func (pc *pushConsumer) sendMessageBack(brokerName string, msg *primitive.Messag
} else {
brokerAddr = msg.StoreHost
}
- _, err := pc.client.InvokeSync(brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
+ _, err := pc.client.InvokeSync(context.Background(), brokerAddr, pc.buildSendBackRequest(msg, delayLevel), 3*time.Second)
if err != nil {
return false
}
diff --git a/internal/client.go b/internal/client.go
index 8822e2d..cb4cfb6 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -129,11 +129,11 @@ type RMQClient interface {
ClientID() string
RegisterProducer(group string, producer InnerProducer)
- InvokeSync(addr string, request *remote.RemotingCommand,
+ InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error)
- InvokeAsync(addr string, request *remote.RemotingCommand,
+ InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error
- InvokeOneWay(addr string, request *remote.RemotingCommand,
+ InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) error
CheckClientInBroker()
SendHeartbeatToAllBrokerWithLock()
@@ -291,31 +291,31 @@ func (c *rmqClient) ClientID() string {
return id
}
-func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
if c.close {
return nil, ErrServiceState
}
- return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
+ return c.remoteClient.InvokeSync(ctx, addr, request, timeoutMillis)
}
-func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
if c.close {
return ErrServiceState
}
- return c.remoteClient.InvokeAsync(addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
+ return c.remoteClient.InvokeAsync(ctx, addr, request, timeoutMillis, func(future *remote.ResponseFuture) {
f(future.ResponseCommand, future.Err)
})
}
-func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) error {
if c.close {
return ErrServiceState
}
- return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
+ return c.remoteClient.InvokeOneWay(ctx, addr, request, timeoutMillis)
}
func (c *rmqClient) CheckClientInBroker() {
@@ -357,7 +357,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
data := value.(*BrokerData)
for id, addr := range data.BrokerAddresses {
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
- response, err := c.remoteClient.InvokeSync(addr, cmd, 3*time.Second)
+ response, err := c.remoteClient.InvokeSync(context.Background(), addr, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send heart beat to broker error: %s", err.Error())
return true
@@ -417,7 +417,7 @@ func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerNam
func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
msgs []*primitive.Message) (*primitive.SendResult, error) {
cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
- err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
+ err := c.remoteClient.InvokeOneWay(ctx, brokerAddrs, cmd, 3*time.Second)
if err != nil {
rlog.Warnf("send messages with oneway error: %v", err)
}
@@ -473,7 +473,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
// PullMessage with sync
func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
- res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 10*time.Second)
+ res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 10*time.Second)
if err != nil {
return nil, err
}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index af9cf2c..a216e93 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -21,13 +21,14 @@ limitations under the License.
package internal
import (
- context "context"
- reflect "reflect"
- time "time"
+ "context"
+ "reflect"
+ "time"
- remote "github.com/apache/rocketmq-client-go/internal/remote"
- primitive "github.com/apache/rocketmq-client-go/primitive"
- gomock "github.com/golang/mock/gomock"
+ "github.com/golang/mock/gomock"
+
+ "github.com/apache/rocketmq-client-go/internal/remote"
+ "github.com/apache/rocketmq-client-go/primitive"
)
// MockInnerProducer is a mock of InnerProducer interface
@@ -55,7 +56,6 @@ func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder {
// PublishTopicList mocks base method
func (m *MockInnerProducer) PublishTopicList() []string {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PublishTopicList")
ret0, _ := ret[0].([]string)
return ret0
@@ -63,25 +63,21 @@ func (m *MockInnerProducer) PublishTopicList() []string {
// PublishTopicList indicates an expected call of PublishTopicList
func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList))
}
// UpdateTopicPublishInfo mocks base method
func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo) {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info)
}
// UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo
func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicPublishInfo", reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo), topic, info)
}
// IsPublishTopicNeedUpdate mocks base method
func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic)
ret0, _ := ret[0].(bool)
return ret0
@@ -89,13 +85,11 @@ func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
// IsPublishTopicNeedUpdate indicates an expected call of IsPublishTopicNeedUpdate
func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPublishTopicNeedUpdate", reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate), topic)
}
// IsUnitMode mocks base method
func (m *MockInnerProducer) IsUnitMode() bool {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsUnitMode")
ret0, _ := ret[0].(bool)
return ret0
@@ -103,7 +97,6 @@ func (m *MockInnerProducer) IsUnitMode() bool {
// IsUnitMode indicates an expected call of IsUnitMode
func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode))
}
@@ -132,7 +125,6 @@ func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder {
// PersistConsumerOffset mocks base method
func (m *MockInnerConsumer) PersistConsumerOffset() error {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PersistConsumerOffset")
ret0, _ := ret[0].(error)
return ret0
@@ -140,25 +132,21 @@ func (m *MockInnerConsumer) PersistConsumerOffset() error {
// PersistConsumerOffset indicates an expected call of PersistConsumerOffset
func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConsumerOffset", reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset))
}
// UpdateTopicSubscribeInfo mocks base method
func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs)
}
// UpdateTopicSubscribeInfo indicates an expected call of UpdateTopicSubscribeInfo
func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicSubscribeInfo", reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo), topic, mqs)
}
// IsSubscribeTopicNeedUpdate mocks base method
func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic)
ret0, _ := ret[0].(bool)
return ret0
@@ -166,13 +154,11 @@ func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
// IsSubscribeTopicNeedUpdate indicates an expected call of IsSubscribeTopicNeedUpdate
func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSubscribeTopicNeedUpdate", reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate), topic)
}
// SubscriptionDataList mocks base method
func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SubscriptionDataList")
ret0, _ := ret[0].([]*SubscriptionData)
return ret0
@@ -180,25 +166,21 @@ func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
// SubscriptionDataList indicates an expected call of SubscriptionDataList
func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscriptionDataList", reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList))
}
// Rebalance mocks base method
func (m *MockInnerConsumer) Rebalance() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "Rebalance")
}
// Rebalance indicates an expected call of Rebalance
func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance", reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance))
}
// IsUnitMode mocks base method
func (m *MockInnerConsumer) IsUnitMode() bool {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsUnitMode")
ret0, _ := ret[0].(bool)
return ret0
@@ -206,7 +188,6 @@ func (m *MockInnerConsumer) IsUnitMode() bool {
// IsUnitMode indicates an expected call of IsUnitMode
func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
}
@@ -235,31 +216,26 @@ func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder {
// Start mocks base method
func (m *MockRMQClient) Start() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "Start")
}
// Start indicates an expected call of Start
func (mr *MockRMQClientMockRecorder) Start() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockRMQClient)(nil).Start))
}
// Shutdown mocks base method
func (m *MockRMQClient) Shutdown() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "Shutdown")
}
// Shutdown indicates an expected call of Shutdown
func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockRMQClient)(nil).Shutdown))
}
// ClientID mocks base method
func (m *MockRMQClient) ClientID() string {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClientID")
ret0, _ := ret[0].(string)
return ret0
@@ -267,104 +243,88 @@ func (m *MockRMQClient) ClientID() string {
// ClientID indicates an expected call of ClientID
func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID", reflect.TypeOf((*MockRMQClient)(nil).ClientID))
}
// RegisterProducer mocks base method
func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "RegisterProducer", group, producer)
}
// RegisterProducer indicates an expected call of RegisterProducer
func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), group, producer)
}
// InvokeSync mocks base method
-func (m *MockRMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
- m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis)
+func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
+ ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
ret0, _ := ret[0].(*remote.RemotingCommand)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), addr, request, timeoutMillis)
+func (mr *MockRMQClientMockRecorder) InvokeSync(ctx, addr, request, timeoutMillis interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), ctx, addr, request, timeoutMillis)
}
// InvokeAsync mocks base method
-func (m *MockRMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
- m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f)
+func (m *MockRMQClient) InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+ ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeoutMillis, f)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis, f interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), addr, request, timeoutMillis, f)
+func (mr *MockRMQClientMockRecorder) InvokeAsync(ctx, addr, request, timeoutMillis, f interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), ctx, addr, request, timeoutMillis, f)
}
// InvokeOneWay mocks base method
-func (m *MockRMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error {
- m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis)
+func (m *MockRMQClient) InvokeOneWay(ctx context.Context, addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error {
+ ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeoutMillis)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), addr, request, timeoutMillis)
+func (mr *MockRMQClientMockRecorder) InvokeOneWay(ctx, addr, request, timeoutMillis interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), ctx, addr, request, timeoutMillis)
}
// CheckClientInBroker mocks base method
func (m *MockRMQClient) CheckClientInBroker() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "CheckClientInBroker")
}
// CheckClientInBroker indicates an expected call of CheckClientInBroker
func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckClientInBroker", reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker))
}
// SendHeartbeatToAllBrokerWithLock mocks base method
func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock")
}
// SendHeartbeatToAllBrokerWithLock indicates an expected call of SendHeartbeatToAllBrokerWithLock
func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeartbeatToAllBrokerWithLock", reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock))
}
// UpdateTopicRouteInfo mocks base method
func (m *MockRMQClient) UpdateTopicRouteInfo() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdateTopicRouteInfo")
}
// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo))
}
// ProcessSendResponse mocks base method
func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error {
- m.ctrl.T.Helper()
varargs := []interface{}{brokerName, cmd, resp}
for _, a := range msgs {
varargs = append(varargs, a)
@@ -376,14 +336,12 @@ func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.Remot
// ProcessSendResponse indicates an expected call of ProcessSendResponse
func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp interface{}, msgs ...interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{brokerName, cmd, resp}, msgs...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSendResponse", reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse), varargs...)
}
// RegisterConsumer mocks base method
func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer)
ret0, _ := ret[0].(error)
return ret0
@@ -391,25 +349,21 @@ func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) e
// RegisterConsumer indicates an expected call of RegisterConsumer
func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer), group, consumer)
}
// UnregisterConsumer mocks base method
func (m *MockRMQClient) UnregisterConsumer(group string) {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "UnregisterConsumer", group)
}
// UnregisterConsumer indicates an expected call of UnregisterConsumer
func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer), group)
}
// PullMessage mocks base method
func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
ret0, _ := ret[0].(*primitive.PullResult)
ret1, _ := ret[1].(error)
@@ -418,13 +372,11 @@ func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, req
// PullMessage indicates an expected call of PullMessage
func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request)
}
// PullMessageAsync mocks base method
func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(*primitive.PullResult)) error {
- m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
ret0, _ := ret[0].(error)
return ret0
@@ -432,30 +384,25 @@ func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string
// PullMessageAsync indicates an expected call of PullMessageAsync
func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), ctx, brokerAddrs, request, f)
}
// RebalanceImmediately mocks base method
func (m *MockRMQClient) RebalanceImmediately() {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "RebalanceImmediately")
}
// RebalanceImmediately indicates an expected call of RebalanceImmediately
func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceImmediately", reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately))
}
// UpdatePublishInfo mocks base method
func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
- m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
}
// UpdatePublishInfo indicates an expected call of UpdatePublishInfo
func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data)
}
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 8690644..4d3008f 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -18,6 +18,7 @@ limitations under the License.
package remote
import (
+ "context"
"sync"
"time"
@@ -35,16 +36,18 @@ type ResponseFuture struct {
BeginTimestamp time.Duration
Done chan bool
callbackOnce sync.Once
+ ctx context.Context
}
// NewResponseFuture create ResponseFuture with opaque, timeout and callback
-func NewResponseFuture(opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
+func NewResponseFuture(ctx context.Context, opaque int32, timeout time.Duration, callback func(*ResponseFuture)) *ResponseFuture {
return &ResponseFuture{
Opaque: opaque,
Done: make(chan bool),
Timeout: timeout,
callback: callback,
BeginTimestamp: time.Duration(time.Now().Unix()) * time.Second,
+ ctx: ctx,
}
}
@@ -66,19 +69,19 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
cmd *RemotingCommand
err error
)
- timer := time.NewTimer(r.Timeout)
+ ctx, cancel := context.WithTimeout(r.ctx, r.Timeout)
+ defer cancel()
for {
select {
case <-r.Done:
cmd, err = r.ResponseCommand, r.Err
goto done
- case <-timer.C:
+ case <-ctx.Done():
err = utils.ErrRequestTimeout
r.Err = err
goto done
}
}
done:
- timer.Stop()
return cmd, err
}
diff --git a/internal/remote/mock_remote_client.go b/internal/remote/mock_remote_client.go
index 62a91c5..e264ab2 100644
--- a/internal/remote/mock_remote_client.go
+++ b/internal/remote/mock_remote_client.go
@@ -21,6 +21,7 @@
package remote
import (
+ context "context"
primitive "github.com/apache/rocketmq-client-go/primitive"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
@@ -75,40 +76,40 @@ func (mr *MockRemotingClientMockRecorder) RegisterInterceptor(interceptors ...in
}
// InvokeSync mocks base method
-func (m *MockRemotingClient) InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
- ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeout)
+func (m *MockRemotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+ ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeout)
ret0, _ := ret[0].(*RemotingCommand)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// InvokeSync indicates an expected call of InvokeSync
-func (mr *MockRemotingClientMockRecorder) InvokeSync(addr, request, timeout interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeSync(ctx, addr, request, timeout interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeSync), ctx, addr, request, timeout)
}
// InvokeAsync mocks base method
-func (m *MockRemotingClient) InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
- ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeout, callback)
+func (m *MockRemotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+ ret := m.ctrl.Call(m, "InvokeAsync", ctx, addr, request, timeout, callback)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeAsync indicates an expected call of InvokeAsync
-func (mr *MockRemotingClientMockRecorder) InvokeAsync(addr, request, timeout, callback interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), addr, request, timeout, callback)
+func (mr *MockRemotingClientMockRecorder) InvokeAsync(ctx, addr, request, timeout, callback interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRemotingClient)(nil).InvokeAsync), ctx, addr, request, timeout, callback)
}
// InvokeOneWay mocks base method
-func (m *MockRemotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
- ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeout)
+func (m *MockRemotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
+ ret := m.ctrl.Call(m, "InvokeOneWay", ctx, addr, request, timeout)
ret0, _ := ret[0].(error)
return ret0
}
// InvokeOneWay indicates an expected call of InvokeOneWay
-func (mr *MockRemotingClientMockRecorder) InvokeOneWay(addr, request, timeout interface{}) *gomock.Call {
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), addr, request, timeout)
+func (mr *MockRemotingClientMockRecorder) InvokeOneWay(ctx, addr, request, timeout interface{}) *gomock.Call {
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRemotingClient)(nil).InvokeOneWay), ctx, addr, request, timeout)
}
// ShutDown mocks base method
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 01688fe..96f67a6 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -41,9 +41,9 @@ type TcpOption struct {
type RemotingClient interface {
RegisterRequestFunc(code int16, f ClientRequestFunc)
RegisterInterceptor(interceptors ...primitive.Interceptor)
- InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
- InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
- InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error
+ InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error)
+ InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error
+ InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error
ShutDown()
}
@@ -69,12 +69,12 @@ func (c *remotingClient) RegisterRequestFunc(code int16, f ClientRequestFunc) {
}
// TODO: merge sync and async model. sync should run on async model by blocking on chan
-func (c *remotingClient) InvokeSync(addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
- conn, err := c.connect(addr)
+func (c *remotingClient) InvokeSync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) (*RemotingCommand, error) {
+ conn, err := c.connect(ctx, addr)
if err != nil {
return nil, err
}
- resp := NewResponseFuture(request.Opaque, timeout, nil)
+ resp := NewResponseFuture(ctx, request.Opaque, timeout, nil)
c.responseTable.Store(resp.Opaque, resp)
defer c.responseTable.Delete(request.Opaque)
err = c.sendRequest(conn, request)
@@ -86,12 +86,12 @@ func (c *remotingClient) InvokeSync(addr string, request *RemotingCommand, timeo
}
// InvokeAsync send request without blocking, just return immediately.
-func (c *remotingClient) InvokeAsync(addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
- conn, err := c.connect(addr)
+func (c *remotingClient) InvokeAsync(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration, callback func(*ResponseFuture)) error {
+ conn, err := c.connect(ctx, addr)
if err != nil {
return err
}
- resp := NewResponseFuture(request.Opaque, timeout, callback)
+ resp := NewResponseFuture(ctx, request.Opaque, timeout, callback)
c.responseTable.Store(resp.Opaque, resp)
err = c.sendRequest(conn, request)
if err != nil {
@@ -109,15 +109,15 @@ func (c *remotingClient) receiveAsync(f *ResponseFuture) {
}
}
-func (c *remotingClient) InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
- conn, err := c.connect(addr)
+func (c *remotingClient) InvokeOneWay(ctx context.Context, addr string, request *RemotingCommand, timeout time.Duration) error {
+ conn, err := c.connect(ctx, addr)
if err != nil {
return err
}
return c.sendRequest(conn, request)
}
-func (c *remotingClient) connect(addr string) (net.Conn, error) {
+func (c *remotingClient) connect(ctx context.Context, addr string) (net.Conn, error) {
//it needs additional locker.
c.connectionLocker.Lock()
defer c.connectionLocker.Unlock()
@@ -125,7 +125,8 @@ func (c *remotingClient) connect(addr string) (net.Conn, error) {
if ok {
return conn.(net.Conn), nil
}
- tcpConn, err := net.Dial("tcp", addr)
+ var d net.Dialer
+ tcpConn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 9cbb117..1d2c033 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -18,6 +18,7 @@ package remote
import (
"bytes"
+ "context"
"errors"
"math/rand"
"net"
@@ -32,7 +33,7 @@ import (
)
func TestNewResponseFuture(t *testing.T) {
- future := NewResponseFuture(10, time.Duration(1000), nil)
+ future := NewResponseFuture(context.Background(), 10, time.Duration(1000), nil)
if future.Opaque != 10 {
t.Errorf("wrong ResponseFuture's opaque. want=%d, got=%d", 10, future.Opaque)
}
@@ -62,7 +63,7 @@ func TestResponseFutureTimeout(t *testing.T) {
r.ResponseCommand.Remark = r.ResponseCommand.Remark + "Go Client"
}
}
- future := NewResponseFuture(10, time.Duration(1000), callback)
+ future := NewResponseFuture(context.Background(), 10, time.Duration(1000), callback)
future.ResponseCommand = NewRemotingCommand(200,
nil, nil)
@@ -83,7 +84,7 @@ func TestResponseFutureTimeout(t *testing.T) {
}
func TestResponseFutureIsTimeout(t *testing.T) {
- future := NewResponseFuture(10, 500*time.Millisecond, nil)
+ future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
if future.isTimeout() != false {
t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
}
@@ -92,12 +93,12 @@ func TestResponseFutureIsTimeout(t *testing.T) {
}
func TestResponseFutureWaitResponse(t *testing.T) {
- future := NewResponseFuture(10, 500*time.Millisecond, nil)
+ future := NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
utils.ErrRequestTimeout, err)
}
- future = NewResponseFuture(10, 500*time.Millisecond, nil)
+ future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
responseError := errors.New("response error")
go func() {
time.Sleep(100 * time.Millisecond)
@@ -108,7 +109,7 @@ func TestResponseFutureWaitResponse(t *testing.T) {
t.Errorf("wrong ResponseFuture waitResponse. want=%v. got=%v",
responseError, err)
}
- future = NewResponseFuture(10, 500*time.Millisecond, nil)
+ future = NewResponseFuture(context.Background(), 10, 500*time.Millisecond, nil)
responseRemotingCommand := NewRemotingCommand(202, nil, nil)
go func() {
time.Sleep(100 * time.Millisecond)
@@ -173,7 +174,7 @@ func TestInvokeSync(t *testing.T) {
go func() {
clientSend.Wait()
- receiveCommand, err := client.InvokeSync(addr,
+ receiveCommand, err := client.InvokeSync(context.Background(), addr,
clientSendRemtingCommand, time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
@@ -237,7 +238,7 @@ func TestInvokeAsync(t *testing.T) {
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
t.Logf("[Send: %d] asychronous message", index)
sendRemotingCommand := randomNewRemotingCommand()
- err := client.InvokeAsync(addr, sendRemotingCommand, time.Second, func(r *ResponseFuture) {
+ err := client.InvokeAsync(context.Background(), addr, sendRemotingCommand, time.Second, func(r *ResponseFuture) {
t.Logf("[Receive: %d] asychronous message response", index)
if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) {
t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -303,7 +304,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
clientSend.Add(1)
go func() {
clientSend.Wait()
- err := client.InvokeAsync(addr, clientSendRemtingCommand,
+ err := client.InvokeAsync(context.Background(), addr, clientSendRemtingCommand,
time.Duration(1000), func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
assert.Equal(t, utils.ErrRequestTimeout, r.Err)
@@ -348,7 +349,7 @@ func TestInvokeOneWay(t *testing.T) {
clientSend.Add(1)
go func() {
clientSend.Wait()
- err := client.InvokeOneWay(addr, clientSendRemtingCommand, 3*time.Second)
+ err := client.InvokeOneWay(context.Background(), addr, clientSendRemtingCommand, 3*time.Second)
if err != nil {
t.Fatalf("failed to invoke synchronous. %s", err)
}
diff --git a/internal/route.go b/internal/route.go
index c6b7e70..3611bb9 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -18,6 +18,7 @@ limitations under the License.
package internal
import (
+ "context"
"encoding/json"
"errors"
"math/rand"
@@ -304,7 +305,7 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
)
for i := 0; i < s.Size(); i++ {
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
- response, err = s.nameSrvClient.InvokeSync(s.getNameServerAddress(), rc, requestTimeout)
+ response, err = s.nameSrvClient.InvokeSync(context.Background(), s.getNameServerAddress(), rc, requestTimeout)
if err == nil {
break
diff --git a/internal/route_test.go b/internal/route_test.go
index 77f79c0..aa944b0 100644
--- a/internal/route_test.go
+++ b/internal/route_test.go
@@ -18,6 +18,7 @@ limitations under the License.
package internal
import (
+ "context"
"testing"
"time"
@@ -48,8 +49,8 @@ func TestQueryTopicRouteInfoFromServer(t *testing.T) {
Convey("When marshal producer trace data", func() {
count := 0
- remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
- func(addr string, request *remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, error) {
+ remotingCli.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, addr string, request *remote.RemotingCommand, timeout time.Duration) (*remote.RemotingCommand, error) {
count++
if count < 3 {
return nil, errors.New("not existed")
diff --git a/internal/trace.go b/internal/trace.go
index ca60eea..8bc6748 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -397,7 +397,7 @@ func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, dat
}
var req = td.buildSendRequest(mq, msg)
- td.cli.InvokeAsync(addr, req, 5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
+ td.cli.InvokeAsync(context.Background(), addr, req, 5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
if e != nil {
rlog.Error("send trace data ,the traceData is %v", data)
}
diff --git a/producer/producer.go b/producer/producer.go
index 0b0367f..25e3210 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -166,7 +166,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
producerCtx.MQ = *mq
}
- res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, msg), 3*time.Second)
+ res, _err := p.client.InvokeSync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
if _err != nil {
err = _err
continue
@@ -205,7 +205,7 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
return errors.Errorf("topic=%s route info not found", mq.Topic)
}
- return p.client.InvokeAsync(addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command *remote.RemotingCommand, err error) {
+ return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second, func(command *remote.RemotingCommand, err error) {
resp := new(primitive.SendResult)
if err != nil {
h(ctx, nil, err)
@@ -251,7 +251,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
return fmt.Errorf("topic=%s route info not found", mq.Topic)
}
- _err := p.client.InvokeOneWay(addr, p.buildSendRequest(mq, msg), 3*time.Second)
+ _err := p.client.InvokeOneWay(ctx, addr, p.buildSendRequest(mq, msg), 3*time.Second)
if _err != nil {
err = _err
continue
@@ -398,7 +398,7 @@ func (tp *transactionProducer) checkTransactionState() {
req := remote.NewRemotingCommand(internal.ReqENDTransaction, header, nil)
req.Remark = tp.errRemark(nil)
- tp.producer.client.InvokeOneWay(callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
+ tp.producer.client.InvokeOneWay(context.Background(), callback.Addr.String(), req, tp.producer.options.SendMsgTimeout)
default:
rlog.Error("unknow type %v", ch)
}
@@ -467,7 +467,7 @@ func (tp *transactionProducer) endTransaction(result primitive.SendResult, err e
req := remote.NewRemotingCommand(internal.ReqENDTransaction, requestHeader, nil)
req.Remark = tp.errRemark(err)
- return tp.producer.client.InvokeOneWay(brokerAddr, req, tp.producer.options.SendMsgTimeout)
+ return tp.producer.client.InvokeOneWay(context.Background(), brokerAddr, req, tp.producer.options.SendMsgTimeout)
}
func (tp *transactionProducer) errRemark(err error) string {
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 849978a..d4c58c2 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -130,7 +130,7 @@ func TestSync(t *testing.T) {
mockB4Send(p)
- client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
+ client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) {
resp.Status = expectedResp.Status
@@ -181,8 +181,8 @@ func TestASync(t *testing.T) {
mockB4Send(p)
- client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
- func(addr string, request *remote.RemotingCommand,
+ client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+ func(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
// mock invoke callback
f(nil, nil)
@@ -226,7 +226,7 @@ func TestOneway(t *testing.T) {
mockB4Send(p)
- client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+ client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
err = p.SendOneWay(ctx, msg)
assert.Nil(t, err)