You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/12/12 08:02:24 UTC

[pulsar-client-go] branch master updated: Fixed logic to attempt reconnections to same broker (#414)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0296890  Fixed logic to attempt reconnections to same broker (#414)
0296890 is described below

commit 0296890a9136a0921da19442cf713a4ebb31f9b0
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Dec 12 00:02:17 2020 -0800

    Fixed logic to attempt reconnections to same broker (#414)
    
    ### Motivation
    
    There is a problem with the re-connection logic introduced in #157.
    
    The change added a logic to keep retrying to establish a TCP connection with broker up to the "operation timeout" (default 30seconds).
    
    There are few issues with it:
     1. (minor) It's not checking that the error is indeed a TCP error (eg: it would retry on auth failures too)
     2. (major) After a TCP connection failure, reconnecting to the same broker is always the wrong approach, because the most likely outcome is that the next attempt will also fail and, worse, the IP might just be unresponsive and we will then have to wait for the full connection timeout time.
    
    The correct solution after a connection failure is to re-do the topic lookup, since the topic will be moving to a different broker and we need to reconnect to the new broker asap.
    
    The only time we can do this connection retry logic is for requests that are not specific to a particular broker (eg: lookup operations). In this case a quick retry on a connection failure will probably land the request on a different, healthy, broker.
---
 pulsar/internal/rpc_client.go | 50 +++++++++++++++++++++++--------------------
 1 file changed, 27 insertions(+), 23 deletions(-)

diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index ee5d2b7..fbd9492 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -19,6 +19,7 @@ package internal
 
 import (
 	"errors"
+	"net"
 	"net/url"
 	"sync"
 	"sync/atomic"
@@ -78,13 +79,37 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
 	message proto.Message) (*RPCResult, error) {
-	return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
+
+	rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
+	if _, ok := err.(net.Error); ok {
+		// We can retry this kind of requests over a connection error because they're
+		// not specific to a particular broker.
+		backoff := Backoff{100 * time.Millisecond}
+		startTime := time.Now()
+		var retryTime time.Duration
+
+		for time.Since(startTime) < c.requestTimeout {
+			retryTime = backoff.Next()
+			c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
+			time.Sleep(retryTime)
+
+			rpcResult, err = c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
+			if _, ok := err.(net.Error); ok {
+				continue
+			} else {
+				// We either succeeded or encountered a non connection error
+				break
+			}
+		}
+	}
+
+	return rpcResult, err
 }
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
 	cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
 	c.metrics.RpcRequestCount.Inc()
-	cnx, err := c.getConn(logicalAddr, physicalAddr)
+	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
 	if err != nil {
 		return nil, err
 	}
@@ -111,27 +136,6 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 	}
 }
 
-func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
-	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
-	backoff := Backoff{1 * time.Second}
-	startTime := time.Now()
-	var retryTime time.Duration
-	if err != nil {
-		for time.Since(startTime) < c.requestTimeout {
-			retryTime = backoff.Next()
-			c.log.Debugf("Reconnecting to broker in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
-			time.Sleep(retryTime)
-			cnx, err = c.pool.GetConnection(logicalAddr, physicalAddr)
-			if err == nil {
-				c.log.Debugf("retry connection success")
-				return cnx, nil
-			}
-		}
-		return nil, err
-	}
-	return cnx, nil
-}
-
 func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
 	message proto.Message) (*RPCResult, error) {
 	c.metrics.RpcRequestCount.Inc()