You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/06/11 05:38:33 UTC
[pulsar-client-go] branch master updated: Allow to have multiple
connections per broker (#276)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new c979046 Allow to have multiple connections per broker (#276)
c979046 is described below
commit c979046238a71838ee7bf5a6ff6f74a725d8f6b7
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 10 22:38:27 2020 -0700
Allow to have multiple connections per broker (#276)
- Allow to have multiple connections per broker
---
pulsar/client.go | 3 +++
pulsar/client_impl.go | 7 ++++++-
pulsar/internal/connection_pool.go | 42 ++++++++++++++++++++++++++++----------
3 files changed, 40 insertions(+), 12 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 53de468..d4af906 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -88,6 +88,9 @@ type ClientOptions struct {
// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
TLSValidateHostname bool
+
+ // Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
+ MaxConnectionsPerBroker int
}
type Client interface {
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 02d9883..d731183 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -95,8 +95,13 @@ func newClient(options ClientOptions) (Client, error) {
operationTimeout = defaultOperationTimeout
}
+ maxConnectionsPerHost := options.MaxConnectionsPerBroker
+ if maxConnectionsPerHost <= 0 {
+ maxConnectionsPerHost = 1
+ }
+
c := &client{
- cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout),
+ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost),
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil)
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index b91324a..a90ac42 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,8 +18,10 @@
package internal
import (
+ "fmt"
"net/url"
"sync"
+ "sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
@@ -37,23 +39,31 @@ type ConnectionPool interface {
}
type connectionPool struct {
- pool sync.Map
- connectionTimeout time.Duration
- tlsOptions *TLSOptions
- auth auth.Provider
+ pool sync.Map
+ connectionTimeout time.Duration
+ tlsOptions *TLSOptions
+ auth auth.Provider
+ maxConnectionsPerHost int32
+ roundRobinCnt int32
}
// NewConnectionPool init connection pool.
-func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration) ConnectionPool {
+func NewConnectionPool(
+ tlsOptions *TLSOptions,
+ auth auth.Provider,
+ connectionTimeout time.Duration,
+ maxConnectionsPerHost int) ConnectionPool {
return &connectionPool{
- tlsOptions: tlsOptions,
- auth: auth,
- connectionTimeout: connectionTimeout,
+ tlsOptions: tlsOptions,
+ auth: auth,
+ connectionTimeout: connectionTimeout,
+ maxConnectionsPerHost: int32(maxConnectionsPerHost),
}
}
func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
- cachedCnx, found := p.pool.Load(logicalAddr.Host)
+ key := p.getMapKey(logicalAddr)
+ cachedCnx, found := p.pool.Load(key)
if found {
cnx := cachedCnx.(*connection)
log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
@@ -63,14 +73,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
return cnx, nil
}
// The cached connection is failed
- p.pool.Delete(logicalAddr.Host)
+ p.pool.Delete(key)
log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
}
// Try to create a new connection
newConnection := newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth)
- newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, newConnection)
+ newCnx, wasCached := p.pool.LoadOrStore(key, newConnection)
cnx := newCnx.(*connection)
+
if !wasCached {
cnx.start()
} else {
@@ -89,3 +100,12 @@ func (p *connectionPool) Close() {
return true
})
}
+
+func (p *connectionPool) getMapKey(addr *url.URL) string {
+ cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
+ if cnt < 0 {
+ cnt = -cnt
+ }
+ idx := cnt % p.maxConnectionsPerHost
+ return fmt.Sprint(addr.Host, '-', idx)
+}