You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/09/29 09:47:25 UTC

[pulsar-client-go] branch master updated: Fix reconnection logic when topic is deleted (#627)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fafa8c3  Fix reconnection logic when topic is deleted (#627)
fafa8c3 is described below

commit fafa8c37407ba2f3fb3066ec9b60124b8bb16847
Author: xiaolong ran <rx...@apache.org>
AuthorDate: Wed Sep 29 17:47:17 2021 +0800

    Fix reconnection logic when topic is deleted (#627)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    
    Fixes #623
    
    
    ### Motivation
    
    As #623 said, when the topic is deleted forced, we don't should trying to reconnect, instead of giving up reconnection.
    
    
    ### Modifications
    
    - Fix prodcuer reconnetion logic
    - Fix consumer reconnection logic
---
 pulsar/consumer_partition.go | 7 +++++++
 pulsar/producer_partition.go | 9 +++++++++
 2 files changed, 16 insertions(+)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cf92949..5f74bcf 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -20,6 +20,7 @@ package pulsar
 import (
 	"fmt"
 	"math"
+	"strings"
 	"sync"
 	"time"
 
@@ -893,6 +894,12 @@ func (pc *partitionConsumer) reconnectToBroker() {
 			pc.log.Info("Reconnected consumer to broker")
 			return
 		}
+		errMsg := err.Error()
+		if strings.Contains(errMsg, errTopicNotFount) {
+			// when topic is deleted, we should give up reconnection.
+			pc.log.Warn("Topic Not Found.")
+			break
+		}
 
 		if maxRetry > 0 {
 			maxRetry--
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 4ae4e00..e273021 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -20,6 +20,7 @@ package pulsar
 import (
 	"context"
 	"fmt"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -58,6 +59,8 @@ var (
 	buffersPool sync.Pool
 )
 
+var errTopicNotFount = "TopicNotFound"
+
 type partitionProducer struct {
 	state  ua.Int32
 	client *client
@@ -350,6 +353,12 @@ func (p *partitionProducer) reconnectToBroker() {
 			p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
 			return
 		}
+		errMsg := err.Error()
+		if strings.Contains(errMsg, errTopicNotFount) {
+			// when topic is deleted, we should give up reconnection.
+			p.log.Warn("Topic Not Found.")
+			break
+		}
 
 		if maxRetry > 0 {
 			maxRetry--