You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/02 17:48:02 UTC
[pulsar-client-go] branch master updated: Make keepalive interval configurable (#838)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f8dc88e Make keepalive interval configurable (#838)
f8dc88e is described below
commit f8dc88e9a9eb070ea8d9f3d6c31b2b00e600567c
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Sat Sep 3 01:47:57 2022 +0800
Make keepalive interval configurable (#838)
* Make keepalive interval configurable
Signed-off-by: Zixuan Liu <no...@gmail.com>
* Fix style
Signed-off-by: Zixuan Liu <no...@gmail.com>
Signed-off-by: Zixuan Liu <no...@gmail.com>
---
pulsar/client.go | 3 +++
pulsar/client_impl.go | 10 ++++++++--
pulsar/internal/connection.go | 14 ++++++++------
pulsar/internal/connection_pool.go | 8 ++++++--
4 files changed, 25 insertions(+), 10 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 9a860ce..22b12ef 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -98,6 +98,9 @@ type ClientOptions struct {
// operation will be marked as failed
OperationTimeout time.Duration
+ // Configure the ping send and check interval, default to 30 seconds.
+ KeepAliveInterval time.Duration
+
// Configure the authentication provider. (default: no authentication)
// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
Authentication
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 1bf661e..e7fa642 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -33,6 +33,7 @@ import (
const (
defaultConnectionTimeout = 10 * time.Second
defaultOperationTimeout = 30 * time.Second
+ defaultKeepAliveInterval = 30 * time.Second
)
type client struct {
@@ -125,9 +126,14 @@ func newClient(options ClientOptions) (Client, error) {
int(options.MetricsCardinality), map[string]string{}, options.MetricsRegisterer)
}
+ keepAliveInterval := options.KeepAliveInterval
+ if keepAliveInterval.Nanoseconds() == 0 {
+ keepAliveInterval = defaultKeepAliveInterval
+ }
+
c := &client{
- cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost, logger,
- metrics),
+ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
+ maxConnectionsPerHost, logger, metrics),
log: logger,
metrics: metrics,
}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 48dfd8f..8ddbc04 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -114,8 +114,6 @@ func (s connectionState) String() string {
}
}
-const keepAliveInterval = 30 * time.Second
-
type request struct {
id *uint64
cmd *pb.BaseCommand
@@ -168,6 +166,8 @@ type connection struct {
maxMessageSize int32
metrics *Metrics
+
+ keepAliveInterval time.Duration
}
// connectionOptions defines configurations for creating connection.
@@ -179,11 +179,13 @@ type connectionOptions struct {
auth auth.Provider
logger log.Logger
metrics *Metrics
+ keepAliveInterval time.Duration
}
func newConnection(opts connectionOptions) *connection {
cnx := &connection{
connectionTimeout: opts.connectionTimeout,
+ keepAliveInterval: opts.keepAliveInterval,
logicalAddr: opts.logicalAddr,
physicalAddr: opts.physicalAddr,
writeBuffer: NewBuffer(4096),
@@ -285,7 +287,7 @@ func (c *connection) doHandshake() bool {
// During the initial handshake, the internal keep alive is not
// active yet, so we need to timeout write and read requests
- c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
+ c.cnx.SetDeadline(time.Now().Add(c.keepAliveInterval))
cmdConnect := &pb.CommandConnect{
ProtocolVersion: proto.Int32(PulsarProtocolVersion),
ClientVersion: proto.String(ClientVersionString),
@@ -369,8 +371,8 @@ func (c *connection) failLeftRequestsWhenClose() {
}
func (c *connection) run() {
- pingSendTicker := time.NewTicker(keepAliveInterval)
- pingCheckTicker := time.NewTicker(keepAliveInterval)
+ pingSendTicker := time.NewTicker(c.keepAliveInterval)
+ pingCheckTicker := time.NewTicker(c.keepAliveInterval)
defer func() {
// stop tickers
@@ -432,7 +434,7 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
case <-c.closeCh:
return
case <-pingCheckTicker.C:
- if c.lastDataReceived().Add(2 * keepAliveInterval).Before(time.Now()) {
+ if c.lastDataReceived().Add(2 * c.keepAliveInterval).Before(time.Now()) {
// We have not received a response to the previous Ping request, the
// connection to broker is stale
c.log.Warn("Detected stale connection to broker")
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 5ec457e..cb172a1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -45,9 +45,10 @@ type connectionPool struct {
auth auth.Provider
maxConnectionsPerHost int32
roundRobinCnt int32
- metrics *Metrics
+ keepAliveInterval time.Duration
- log log.Logger
+ metrics *Metrics
+ log log.Logger
}
// NewConnectionPool init connection pool.
@@ -55,6 +56,7 @@ func NewConnectionPool(
tlsOptions *TLSOptions,
auth auth.Provider,
connectionTimeout time.Duration,
+ keepAliveInterval time.Duration,
maxConnectionsPerHost int,
logger log.Logger,
metrics *Metrics) ConnectionPool {
@@ -64,6 +66,7 @@ func NewConnectionPool(
auth: auth,
connectionTimeout: connectionTimeout,
maxConnectionsPerHost: int32(maxConnectionsPerHost),
+ keepAliveInterval: keepAliveInterval,
log: logger,
metrics: metrics,
}
@@ -97,6 +100,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
tls: p.tlsOptions,
connectionTimeout: p.connectionTimeout,
auth: p.auth,
+ keepAliveInterval: p.keepAliveInterval,
logger: p.log,
metrics: p.metrics,
})