You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/12 07:05:26 UTC

[rocketmq-client-go] branch native updated: [ISSUE #402] fix(consumer): improve retry topic support (#403)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 4f9013e  [ISSUE #402] fix(consumer): improve retry topic support (#403)
4f9013e is described below

commit 4f9013ed90a244cc49fc1b392d6c967a2631c5e9
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Wed Feb 12 15:05:18 2020 +0800

    [ISSUE #402] fix(consumer): improve retry topic support (#403)
    
    * fix(consumer): improve retry topic support
---
 consumer/consumer.go      |  1 -
 consumer/push_consumer.go | 13 +---------
 internal/client.go        | 62 ++++++++++++++++++++++++++++++++---------------
 3 files changed, 43 insertions(+), 33 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 5a5d7d6..c46917e 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -290,7 +290,6 @@ func (dc *defaultConsumer) start() error {
 		dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
 	}
 
-	dc.client.UpdateTopicRouteInfo()
 	dc.client.Start()
 	atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
 	dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c84e524..3bef44c 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -163,9 +163,6 @@ func (pc *pushConsumer) Start() error {
 			}
 		}()
 
-		pc.Rebalance()
-		time.Sleep(1 * time.Second)
-
 		go primitive.WithRecover(func() {
 			// initial lock.
 			if !pc.consumeOrderly {
@@ -203,9 +200,9 @@ func (pc *pushConsumer) Start() error {
 			return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
 		}
 	}
-	pc.client.RebalanceImmediately()
 	pc.client.CheckClientInBroker()
 	pc.client.SendHeartbeatToAllBrokerWithLock()
+	pc.client.RebalanceImmediately()
 
 	return err
 }
@@ -234,14 +231,6 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 	pc.subscriptionDataTable.Store(topic, data)
 	pc.subscribedTopic[topic] = ""
 
-	if pc.option.ConsumerModel == Clustering {
-		// add retry topic for clustering mode
-		retryTopic := internal.GetRetryTopic(pc.consumerGroup)
-		retryData := buildSubscriptionData(retryTopic, MessageSelector{Expression: _SubAll})
-		pc.subscriptionDataTable.Store(retryTopic, retryData)
-		pc.subscribedTopic[retryTopic] = ""
-	}
-
 	pc.consumeFunc.Add(&PushConsumerCallback{
 		f:     f,
 		topic: topic,
diff --git a/internal/client.go b/internal/client.go
index 08bbcb3..56bda03 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -262,15 +262,19 @@ func (c *rmqClient) Start() {
 		}
 		// fetchNameServerAddr
 		if len(c.option.NameServerAddrs) == 0 {
-			go func() {
-				// delay
-				ticker := time.NewTicker(60 * 2 * time.Second)
+			go primitive.WithRecover(func() {
+				op := func() {
+					c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
+				}
+				time.Sleep(10 * time.Second)
+				op()
+
+				ticker := time.NewTicker(2 * time.Minute)
 				defer ticker.Stop()
-				time.Sleep(50 * time.Millisecond)
 				for {
 					select {
 					case <-ticker.C:
-						c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
+						op()
 					case <-c.done:
 						rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
 							"clientID": c.ClientID(),
@@ -278,19 +282,24 @@ func (c *rmqClient) Start() {
 						return
 					}
 				}
-			}()
+			})
 		}
 
 		// schedule update route info
 		go primitive.WithRecover(func() {
 			// delay
+			op := func() {
+				c.UpdateTopicRouteInfo()
+			}
+			time.Sleep(10 * time.Millisecond)
+			op()
+
 			ticker := time.NewTicker(_PullNameServerInterval)
 			defer ticker.Stop()
-			time.Sleep(50 * time.Millisecond)
 			for {
 				select {
 				case <-ticker.C:
-					c.UpdateTopicRouteInfo()
+					op()
 				case <-c.done:
 					rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
 						"clientID": c.ClientID(),
@@ -301,13 +310,20 @@ func (c *rmqClient) Start() {
 		})
 
 		go primitive.WithRecover(func() {
+			op := func() {
+				c.namesrvs.cleanOfflineBroker()
+				c.SendHeartbeatToAllBrokerWithLock()
+			}
+
+			time.Sleep(time.Second)
+			op()
+
 			ticker := time.NewTicker(_HeartbeatBrokerInterval)
 			defer ticker.Stop()
 			for {
 				select {
 				case <-ticker.C:
-					c.namesrvs.cleanOfflineBroker()
-					c.SendHeartbeatToAllBrokerWithLock()
+					op()
 				case <-c.done:
 					rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
 						"clientID": c.ClientID(),
@@ -319,21 +335,27 @@ func (c *rmqClient) Start() {
 
 		// schedule persist offset
 		go primitive.WithRecover(func() {
+			op := func() {
+				c.consumerMap.Range(func(key, value interface{}) bool {
+					consumer := value.(InnerConsumer)
+					err := consumer.PersistConsumerOffset()
+					if err != nil {
+						rlog.Error("persist offset failed", map[string]interface{}{
+							rlog.LogKeyUnderlayError: err,
+						})
+					}
+					return true
+				})
+			}
+			time.Sleep(10 * time.Second)
+			op()
+
 			ticker := time.NewTicker(_PersistOffsetInterval)
 			defer ticker.Stop()
 			for {
 				select {
 				case <-ticker.C:
-					c.consumerMap.Range(func(key, value interface{}) bool {
-						consumer := value.(InnerConsumer)
-						err := consumer.PersistConsumerOffset()
-						if err != nil {
-							rlog.Error("persist offset failed", map[string]interface{}{
-								rlog.LogKeyUnderlayError: err,
-							})
-						}
-						return true
-					})
+					op()
 				case <-c.done:
 					rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
 						"clientID": c.ClientID(),