You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ti...@apache.org on 2023/03/20 02:59:04 UTC

[rocketmq-client-go] branch master updated: fix: unlock all queues when consumer shutdown in orderly model

This is an automated email from the ASF dual-hosted git repository.

tigerlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bca88d  fix: unlock all queues when consumer shutdown in orderly model
     new 3b8b238  Merge pull request #1015 from cserwen/orderly_consume_github
9bca88d is described below

commit 9bca88dcb44a0f6a4e21639bb269271eafafbe64
Author: dengzhiwen1 <de...@xiaomi.com>
AuthorDate: Fri Mar 10 14:03:58 2023 +0800

    fix: unlock all queues when consumer shutdown in orderly model
---
 consumer/consumer.go      | 12 ++++++------
 consumer/push_consumer.go |  4 +++-
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 1511f01..7bd2b02 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -631,12 +631,12 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
 		}
 	} else {
 		response, err := dc.client.InvokeSync(context.Background(), addr, request, 1*time.Second)
-		rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
-			rlog.LogKeyBroker:        addr,
-			rlog.LogKeyUnderlayError: err,
-		})
-		if response.Code != internal.ResSuccess {
-			// TODO error
+		if err != nil || response == nil || response.Code != internal.ResSuccess {
+			rlog.Error("lock MessageQueue to broker invoke error", map[string]interface{}{
+				rlog.LogKeyBroker:        addr,
+				rlog.LogKeyUnderlayError: err,
+				"response":               response,
+			})
 		}
 	}
 }
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 7d585de..85f9725 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -277,7 +277,9 @@ func (pc *pushConsumer) Shutdown() error {
 			pc.option.TraceDispatcher.Close()
 		}
 		close(pc.done)
-
+		if pc.consumeOrderly && pc.model == Clustering {
+			pc.unlockAll(false)
+		}
 		pc.client.UnregisterConsumer(pc.consumerGroup)
 		err = pc.defaultConsumer.shutdown()
 	})