You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/02/12 07:05:26 UTC
[rocketmq-client-go] branch native updated: [ISSUE #402]
fix(consumer): improve retry topic support (#403)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 4f9013e [ISSUE #402] fix(consumer): improve retry topic support (#403)
4f9013e is described below
commit 4f9013ed90a244cc49fc1b392d6c967a2631c5e9
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Wed Feb 12 15:05:18 2020 +0800
[ISSUE #402] fix(consumer): improve retry topic support (#403)
* fix(consumer): improve retry topic support
---
consumer/consumer.go | 1 -
consumer/push_consumer.go | 13 +---------
internal/client.go | 62 ++++++++++++++++++++++++++++++++---------------
3 files changed, 43 insertions(+), 33 deletions(-)
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 5a5d7d6..c46917e 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -290,7 +290,6 @@ func (dc *defaultConsumer) start() error {
dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
}
- dc.client.UpdateTopicRouteInfo()
dc.client.Start()
atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c84e524..3bef44c 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -163,9 +163,6 @@ func (pc *pushConsumer) Start() error {
}
}()
- pc.Rebalance()
- time.Sleep(1 * time.Second)
-
go primitive.WithRecover(func() {
// initial lock.
if !pc.consumeOrderly {
@@ -203,9 +200,9 @@ func (pc *pushConsumer) Start() error {
return fmt.Errorf("the topic=%s route info not found, it may not exist", k)
}
}
- pc.client.RebalanceImmediately()
pc.client.CheckClientInBroker()
pc.client.SendHeartbeatToAllBrokerWithLock()
+ pc.client.RebalanceImmediately()
return err
}
@@ -234,14 +231,6 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
pc.subscriptionDataTable.Store(topic, data)
pc.subscribedTopic[topic] = ""
- if pc.option.ConsumerModel == Clustering {
- // add retry topic for clustering mode
- retryTopic := internal.GetRetryTopic(pc.consumerGroup)
- retryData := buildSubscriptionData(retryTopic, MessageSelector{Expression: _SubAll})
- pc.subscriptionDataTable.Store(retryTopic, retryData)
- pc.subscribedTopic[retryTopic] = ""
- }
-
pc.consumeFunc.Add(&PushConsumerCallback{
f: f,
topic: topic,
diff --git a/internal/client.go b/internal/client.go
index 08bbcb3..56bda03 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -262,15 +262,19 @@ func (c *rmqClient) Start() {
}
// fetchNameServerAddr
if len(c.option.NameServerAddrs) == 0 {
- go func() {
- // delay
- ticker := time.NewTicker(60 * 2 * time.Second)
+ go primitive.WithRecover(func() {
+ op := func() {
+ c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
+ }
+ time.Sleep(10 * time.Second)
+ op()
+
+ ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
- time.Sleep(50 * time.Millisecond)
for {
select {
case <-ticker.C:
- c.namesrvs.UpdateNameServerAddress(c.option.NameServerDomain, c.option.InstanceName)
+ op()
case <-c.done:
rlog.Info("The RMQClient stopping update name server domain info.", map[string]interface{}{
"clientID": c.ClientID(),
@@ -278,19 +282,24 @@ func (c *rmqClient) Start() {
return
}
}
- }()
+ })
}
// schedule update route info
go primitive.WithRecover(func() {
// delay
+ op := func() {
+ c.UpdateTopicRouteInfo()
+ }
+ time.Sleep(10 * time.Millisecond)
+ op()
+
ticker := time.NewTicker(_PullNameServerInterval)
defer ticker.Stop()
- time.Sleep(50 * time.Millisecond)
for {
select {
case <-ticker.C:
- c.UpdateTopicRouteInfo()
+ op()
case <-c.done:
rlog.Info("The RMQClient stopping update topic route info.", map[string]interface{}{
"clientID": c.ClientID(),
@@ -301,13 +310,20 @@ func (c *rmqClient) Start() {
})
go primitive.WithRecover(func() {
+ op := func() {
+ c.namesrvs.cleanOfflineBroker()
+ c.SendHeartbeatToAllBrokerWithLock()
+ }
+
+ time.Sleep(time.Second)
+ op()
+
ticker := time.NewTicker(_HeartbeatBrokerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
- c.namesrvs.cleanOfflineBroker()
- c.SendHeartbeatToAllBrokerWithLock()
+ op()
case <-c.done:
rlog.Info("The RMQClient stopping clean off line broker and heart beat", map[string]interface{}{
"clientID": c.ClientID(),
@@ -319,21 +335,27 @@ func (c *rmqClient) Start() {
// schedule persist offset
go primitive.WithRecover(func() {
+ op := func() {
+ c.consumerMap.Range(func(key, value interface{}) bool {
+ consumer := value.(InnerConsumer)
+ err := consumer.PersistConsumerOffset()
+ if err != nil {
+ rlog.Error("persist offset failed", map[string]interface{}{
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
+ return true
+ })
+ }
+ time.Sleep(10 * time.Second)
+ op()
+
ticker := time.NewTicker(_PersistOffsetInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
- c.consumerMap.Range(func(key, value interface{}) bool {
- consumer := value.(InnerConsumer)
- err := consumer.PersistConsumerOffset()
- if err != nil {
- rlog.Error("persist offset failed", map[string]interface{}{
- rlog.LogKeyUnderlayError: err,
- })
- }
- return true
- })
+ op()
case <-c.done:
rlog.Info("The RMQClient stopping persist offset", map[string]interface{}{
"clientID": c.ClientID(),