You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/07/06 13:50:36 UTC

[rocketmq-client-go] branch master updated: fix: call cancel() as soon as possible to release the resources associatd with context (#691)

This is an automated email from the ASF dual-hosted git repository.

maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8612bf7  fix: call cancel() as soon as possible to release the resources associatd with context (#691)
8612bf7 is described below

commit 8612bf76869222a16bc1c55838f15aba1ad24dd2
Author: Berlin <fe...@qq.com>
AuthorDate: Tue Jul 6 21:50:30 2021 +0800

    fix: call cancel() as soon as possible to release the resources associatd with context (#691)
---
 internal/client.go                    | 12 +++++++++---
 internal/remote/remote_client_test.go |  6 ++++--
 internal/route.go                     |  4 +++-
 internal/trace.go                     |  3 ++-
 producer/producer.go                  |  4 +++-
 5 files changed, 21 insertions(+), 8 deletions(-)

diff --git a/internal/client.go b/internal/client.go
index bf0437a..6e665ea 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -448,7 +448,9 @@ func (c *rmqClient) InvokeSync(ctx context.Context, addr string, request *remote
 	if c.close {
 		return nil, ErrServiceState
 	}
-	ctx, _ = context.WithTimeout(ctx, timeoutMillis)
+	var cancel context.CancelFunc
+	ctx, cancel = context.WithTimeout(ctx, timeoutMillis)
+	defer cancel()
 	return c.remoteClient.InvokeSync(ctx, addr, request)
 }
 
@@ -524,14 +526,16 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 			}
 			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
 
-			ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
+			ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
 			response, err := c.remoteClient.InvokeSync(ctx, addr, cmd)
 			if err != nil {
+				cancel()
 				rlog.Warning("send heart beat to broker error", map[string]interface{}{
 					rlog.LogKeyUnderlayError: err,
 				})
 				return true
 			}
+			cancel()
 			if response.Code == ResSuccess {
 				c.namesrvs.AddBrokerVersion(brokerName, addr, int32(response.Version))
 				rlog.Debug("send heart beat to broker success", map[string]interface{}{
@@ -633,7 +637,9 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 // PullMessage with sync
 func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader) (*primitive.PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-	ctx, _ = context.WithTimeout(ctx, 30*time.Second)
+	var cancel context.CancelFunc
+	ctx, cancel = context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
 	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd)
 	if err != nil {
 		return nil, err
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 5b69fd6..f160674 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -77,7 +77,8 @@ func TestResponseFutureTimeout(t *testing.T) {
 }
 
 func TestResponseFutureWaitResponse(t *testing.T) {
-	ctx, _ := context.WithTimeout(context.Background(), time.Duration(1000))
+	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(1000))
+	defer cancel()
 	future := NewResponseFuture(ctx, 10, nil)
 	if _, err := future.waitResponse(); err != utils.ErrRequestTimeout {
 		t.Errorf("wrong ResponseFuture waitResponse. want=%v, got=%v",
@@ -289,7 +290,8 @@ func TestInvokeAsyncTimeout(t *testing.T) {
 	clientSend.Add(1)
 	go func() {
 		clientSend.Wait()
-		ctx, _ := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
+		ctx, cancel := context.WithTimeout(context.Background(), time.Duration(10*time.Second))
+		defer cancel()
 		err := client.InvokeAsync(ctx, addr, clientSendRemtingCommand,
 			func(r *ResponseFuture) {
 				assert.NotNil(t, r.Err)
diff --git a/internal/route.go b/internal/route.go
index 7b27c9d..66be96d 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -371,12 +371,14 @@ func (s *namesrvs) queryTopicRouteInfoFromServer(topic string) (*TopicRouteData,
 
 	for i := 0; i < s.Size(); i++ {
 		rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-		ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
+		ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
 		response, err = s.nameSrvClient.InvokeSync(ctx, s.getNameServerAddress(), rc)
 
 		if err == nil {
+			cancel()
 			break
 		}
+		cancel()
 	}
 	if err != nil {
 		rlog.Error("connect to namesrv failed.", map[string]interface{}{
diff --git a/internal/trace.go b/internal/trace.go
index a7fee67..cef1634 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -458,7 +458,8 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
 	}
 
 	var req = td.buildSendRequest(mq, msg)
-	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+	defer cancel()
 	err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
 		resp := primitive.NewSendResult()
 		if e != nil {
diff --git a/producer/producer.go b/producer/producer.go
index 8ebb660..f3b5afe 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -241,7 +241,9 @@ func (p *defaultProducer) sendAsync(ctx context.Context, msg *primitive.Message,
 		return errors.Errorf("topic=%s route info not found", mq.Topic)
 	}
 
-	ctx, _ = context.WithTimeout(ctx, 3*time.Second)
+	var cancel context.CancelFunc
+	ctx, cancel = context.WithTimeout(ctx, 3*time.Second)
+	defer cancel()
 	return p.client.InvokeAsync(ctx, addr, p.buildSendRequest(mq, msg), func(command *remote.RemotingCommand, err error) {
 		resp := primitive.NewSendResult()
 		if err != nil {