You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/09/29 09:47:25 UTC
[pulsar-client-go] branch master updated: Fix reconnection logic
when topic is deleted (#627)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new fafa8c3 Fix reconnection logic when topic is deleted (#627)
fafa8c3 is described below
commit fafa8c37407ba2f3fb3066ec9b60124b8bb16847
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Wed Sep 29 17:47:17 2021 +0800
Fix reconnection logic when topic is deleted (#627)
Signed-off-by: xiaolongran <xi...@tencent.com>
Fixes #623
### Motivation
As #623 said, when the topic is deleted forced, we don't should trying to reconnect, instead of giving up reconnection.
### Modifications
- Fix prodcuer reconnetion logic
- Fix consumer reconnection logic
---
pulsar/consumer_partition.go | 7 +++++++
pulsar/producer_partition.go | 9 +++++++++
2 files changed, 16 insertions(+)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cf92949..5f74bcf 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -20,6 +20,7 @@ package pulsar
import (
"fmt"
"math"
+ "strings"
"sync"
"time"
@@ -893,6 +894,12 @@ func (pc *partitionConsumer) reconnectToBroker() {
pc.log.Info("Reconnected consumer to broker")
return
}
+ errMsg := err.Error()
+ if strings.Contains(errMsg, errTopicNotFount) {
+ // when topic is deleted, we should give up reconnection.
+ pc.log.Warn("Topic Not Found.")
+ break
+ }
if maxRetry > 0 {
maxRetry--
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 4ae4e00..e273021 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -58,6 +59,8 @@ var (
buffersPool sync.Pool
)
+var errTopicNotFount = "TopicNotFound"
+
type partitionProducer struct {
state ua.Int32
client *client
@@ -350,6 +353,12 @@ func (p *partitionProducer) reconnectToBroker() {
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
return
}
+ errMsg := err.Error()
+ if strings.Contains(errMsg, errTopicNotFount) {
+ // when topic is deleted, we should give up reconnection.
+ p.log.Warn("Topic Not Found.")
+ break
+ }
if maxRetry > 0 {
maxRetry--