You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/07/06 13:50:36 UTC
[rocketmq-client-go] branch master updated: fix: call cancel() as
soon as possible to release the resources associatd with context (#691)
This is an automated email from the ASF dual-hosted git repository.
maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 8612bf7 fix: call cancel() as soon as possible to release the resources associatd with context (#691)
8612bf7 is described below
commit 8612bf76869222a16bc1c55838f15aba1ad24dd2
Author: Berlin <fe...@qq.com>
AuthorDate: Tue Jul 6 21:50:30 2021 +0800
fix: call cancel() as soon as possible to release the resources associatd with context (#691)
---
internal/client.go | 12 +++++++++---
internal/remote/remote_client_test.go | 6 ++++--
internal/route.go | 4 +++-
internal/trace.go | 3 ++-
producer/producer.go | 4 +++-
5 files changed, 21 insertions(+), 8 deletions(-)
diff --git a/internal/client.go b/internal/client.go
index bf0437a..6e665ea 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -448,7 +448,9 @@ func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote
if c.close {
return nil, ErrServiceState
}
- ctx, _ = context.WithTimeout(ctx, timeoutMillis)
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, timeoutMillis)
+ defer cancel()
return c.remoteClient.InvokeSync(ctx, addr, request)
}
@@ -524,14 +526,16 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
}
cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
- ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
if err != nil {
+ cancel()
rlog.Warning("send heart beat to broker error", map[string]interface{}{
rlog.LogKeyUnderlayError: err,
})
return true
}
+ cancel()
if response.Code == ResSuccess {
c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
rlog.Debug("send heart beat to broker success", map[string]interface{}{
@@ -633,7 +637,9 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
// PullMessage with sync
func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
- ctx, _ = context.WithTimeout(ctx, 30*time.Second)
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd)
if err != nil {
return nil, err
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 5b69fd6..f160674 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -77,7 +77,8 @@ func TestResponseFutureTimeout(t *testing.T) {
}
func TestResponseFutureWaitResponse(t *testing.T) {
- ctx, _ := context.WithTimeout(context.Background(), time.Duration(1000))
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000))
+ defer cancel()
future := NewResponseFuture(ctx, 10, nil)
if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
@@ -289,7 +290,8 @@ func TestInvokeAsyncTimeout(t *testing.T) {
clientSend.Add(1)
go func() {
clientSend.Wait()
- ctx, _ := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
+ ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
+ defer cancel()
err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
func(r *ResponseFuture) {
assert.NotNil(t, r.Err)
diff --git a/internal/route.go b/internal/route.go
index 7b27c9d..66be96d 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -371,12 +371,14 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
for i := 0; i < s.Size(); i++ {
rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
- ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
+ ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)
if err == nil {
+ cancel()
break
}
+ cancel()
}
if err != nil {
rlog.Error("connect to namesrv failed.", map[string]interface{}{
diff --git a/internal/trace.go b/internal/trace.go
index a7fee67..cef1634 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -458,7 +458,8 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
}
var req = td.buildSendRequest(mq, msg)
- ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
resp := primitive.NewSendResult()
if e != nil {
diff --git a/producer/producer.go b/producer/producer.go
index 8ebb660..f3b5afe 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -241,7 +241,9 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
return errors.Errorf("topic=%s route info not found", mq.Topic)
}
- ctx, _ = context.WithTimeout(ctx, 3*time.Second)
+ var cancel context.CancelFunc
+ ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
+ defer cancel()
return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
resp := primitive.NewSendResult()
if err != nil {