You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/02 17:48:02 UTC

[pulsar-client-go] branch master updated: Make keepalive interval configurable (#838)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f8dc88e  Make keepalive interval configurable (#838)
f8dc88e is described below

commit f8dc88e9a9eb070ea8d9f3d6c31b2b00e600567c
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Sat Sep 3 01:47:57 2022 +0800

    Make keepalive interval configurable (#838)
    
    * Make keepalive interval configurable
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    
    * Fix style
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
---
 pulsar/client.go                   |  3 +++
 pulsar/client_impl.go              | 10 ++++++++--
 pulsar/internal/connection.go      | 14 ++++++++------
 pulsar/internal/connection_pool.go |  8 ++++++--
 4 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 9a860ce..22b12ef 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -98,6 +98,9 @@ type ClientOptions struct {
 	// operation will be marked as failed
 	OperationTimeout time.Duration
 
+	// Configure the ping send and check interval, default to 30 seconds.
+	KeepAliveInterval time.Duration
+
 	// Configure the authentication provider. (default: no authentication)
 	// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
 	Authentication
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 1bf661e..e7fa642 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -33,6 +33,7 @@ import (
 const (
 	defaultConnectionTimeout = 10 * time.Second
 	defaultOperationTimeout  = 30 * time.Second
+	defaultKeepAliveInterval = 30 * time.Second
 )
 
 type client struct {
@@ -125,9 +126,14 @@ func newClient(options ClientOptions) (Client, error) {
 			int(options.MetricsCardinality), map[string]string{}, options.MetricsRegisterer)
 	}
 
+	keepAliveInterval := options.KeepAliveInterval
+	if keepAliveInterval.Nanoseconds() == 0 {
+		keepAliveInterval = defaultKeepAliveInterval
+	}
+
 	c := &client{
-		cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost, logger,
-			metrics),
+		cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
+			maxConnectionsPerHost, logger, metrics),
 		log:     logger,
 		metrics: metrics,
 	}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 48dfd8f..8ddbc04 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -114,8 +114,6 @@ func (s connectionState) String() string {
 	}
 }
 
-const keepAliveInterval = 30 * time.Second
-
 type request struct {
 	id       *uint64
 	cmd      *pb.BaseCommand
@@ -168,6 +166,8 @@ type connection struct {
 
 	maxMessageSize int32
 	metrics        *Metrics
+
+	keepAliveInterval time.Duration
 }
 
 // connectionOptions defines configurations for creating connection.
@@ -179,11 +179,13 @@ type connectionOptions struct {
 	auth              auth.Provider
 	logger            log.Logger
 	metrics           *Metrics
+	keepAliveInterval time.Duration
 }
 
 func newConnection(opts connectionOptions) *connection {
 	cnx := &connection{
 		connectionTimeout:    opts.connectionTimeout,
+		keepAliveInterval:    opts.keepAliveInterval,
 		logicalAddr:          opts.logicalAddr,
 		physicalAddr:         opts.physicalAddr,
 		writeBuffer:          NewBuffer(4096),
@@ -285,7 +287,7 @@ func (c *connection) doHandshake() bool {
 
 	// During the initial handshake, the internal keep alive is not
 	// active yet, so we need to timeout write and read requests
-	c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
+	c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
 	cmdConnect := &pb.CommandConnect{
 		ProtocolVersion: proto.Int32(PulsarProtocolVersion),
 		ClientVersion:   proto.String(ClientVersionString),
@@ -369,8 +371,8 @@ func (c *connection) failLeftRequestsWhenClose() {
 }
 
 func (c *connection) run() {
-	pingSendTicker := time.NewTicker(keepAliveInterval)
-	pingCheckTicker := time.NewTicker(keepAliveInterval)
+	pingSendTicker := time.NewTicker(c.keepAliveInterval)
+	pingCheckTicker := time.NewTicker(c.keepAliveInterval)
 
 	defer func() {
 		// stop tickers
@@ -432,7 +434,7 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
 		case <-c.closeCh:
 			return
 		case <-pingCheckTicker.C:
-			if c.lastDataReceived().Add(2 * keepAliveInterval).Before(time.Now()) {
+			if c.lastDataReceived().Add(2 * c.keepAliveInterval).Before(time.Now()) {
 				// We have not received a response to the previous Ping request, the
 				// connection to broker is stale
 				c.log.Warn("Detected stale connection to broker")
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 5ec457e..cb172a1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -45,9 +45,10 @@ type connectionPool struct {
 	auth                  auth.Provider
 	maxConnectionsPerHost int32
 	roundRobinCnt         int32
-	metrics               *Metrics
+	keepAliveInterval     time.Duration
 
-	log log.Logger
+	metrics *Metrics
+	log     log.Logger
 }
 
 // NewConnectionPool init connection pool.
@@ -55,6 +56,7 @@ func NewConnectionPool(
 	tlsOptions *TLSOptions,
 	auth auth.Provider,
 	connectionTimeout time.Duration,
+	keepAliveInterval time.Duration,
 	maxConnectionsPerHost int,
 	logger log.Logger,
 	metrics *Metrics) ConnectionPool {
@@ -64,6 +66,7 @@ func NewConnectionPool(
 		auth:                  auth,
 		connectionTimeout:     connectionTimeout,
 		maxConnectionsPerHost: int32(maxConnectionsPerHost),
+		keepAliveInterval:     keepAliveInterval,
 		log:                   logger,
 		metrics:               metrics,
 	}
@@ -97,6 +100,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
 			tls:               p.tlsOptions,
 			connectionTimeout: p.connectionTimeout,
 			auth:              p.auth,
+			keepAliveInterval: p.keepAliveInterval,
 			logger:            p.log,
 			metrics:           p.metrics,
 		})