You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/06/11 05:38:33 UTC

[pulsar-client-go] branch master updated: Allow to have multiple connections per broker (#276)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c979046  Allow to have multiple connections per broker (#276)
c979046 is described below

commit c979046238a71838ee7bf5a6ff6f74a725d8f6b7
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 10 22:38:27 2020 -0700

    Allow to have multiple connections per broker (#276)
    
    - Allow to have multiple connections per broker
---
 pulsar/client.go                   |  3 +++
 pulsar/client_impl.go              |  7 ++++++-
 pulsar/internal/connection_pool.go | 42 ++++++++++++++++++++++++++++----------
 3 files changed, 40 insertions(+), 12 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 53de468..d4af906 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -88,6 +88,9 @@ type ClientOptions struct {
 
 	// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
 	TLSValidateHostname bool
+
+	// Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
+	MaxConnectionsPerBroker int
 }
 
 type Client interface {
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 02d9883..d731183 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -95,8 +95,13 @@ func newClient(options ClientOptions) (Client, error) {
 		operationTimeout = defaultOperationTimeout
 	}
 
+	maxConnectionsPerHost := options.MaxConnectionsPerBroker
+	if maxConnectionsPerHost <= 0 {
+		maxConnectionsPerHost = 1
+	}
+
 	c := &client{
-		cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout),
+		cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost),
 	}
 	c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
 	c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil)
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index b91324a..a90ac42 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,8 +18,10 @@
 package internal
 
 import (
+	"fmt"
 	"net/url"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/apache/pulsar-client-go/pulsar/internal/auth"
@@ -37,23 +39,31 @@ type ConnectionPool interface {
 }
 
 type connectionPool struct {
-	pool              sync.Map
-	connectionTimeout time.Duration
-	tlsOptions        *TLSOptions
-	auth              auth.Provider
+	pool                  sync.Map
+	connectionTimeout     time.Duration
+	tlsOptions            *TLSOptions
+	auth                  auth.Provider
+	maxConnectionsPerHost int32
+	roundRobinCnt         int32
 }
 
 // NewConnectionPool init connection pool.
-func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration) ConnectionPool {
+func NewConnectionPool(
+	tlsOptions *TLSOptions,
+	auth auth.Provider,
+	connectionTimeout time.Duration,
+	maxConnectionsPerHost int) ConnectionPool {
 	return &connectionPool{
-		tlsOptions:        tlsOptions,
-		auth:              auth,
-		connectionTimeout: connectionTimeout,
+		tlsOptions:            tlsOptions,
+		auth:                  auth,
+		connectionTimeout:     connectionTimeout,
+		maxConnectionsPerHost: int32(maxConnectionsPerHost),
 	}
 }
 
 func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
-	cachedCnx, found := p.pool.Load(logicalAddr.Host)
+	key := p.getMapKey(logicalAddr)
+	cachedCnx, found := p.pool.Load(key)
 	if found {
 		cnx := cachedCnx.(*connection)
 		log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
@@ -63,14 +73,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
 			return cnx, nil
 		}
 		// The cached connection is failed
-		p.pool.Delete(logicalAddr.Host)
+		p.pool.Delete(key)
 		log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
 	}
 
 	// Try to create a new connection
 	newConnection := newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth)
-	newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, newConnection)
+	newCnx, wasCached := p.pool.LoadOrStore(key, newConnection)
 	cnx := newCnx.(*connection)
+
 	if !wasCached {
 		cnx.start()
 	} else {
@@ -89,3 +100,12 @@ func (p *connectionPool) Close() {
 		return true
 	})
 }
+
+func (p *connectionPool) getMapKey(addr *url.URL) string {
+	cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
+	if cnt < 0 {
+		cnt = -cnt
+	}
+	idx := cnt % p.maxConnectionsPerHost
+	return fmt.Sprint(addr.Host, '-', idx)
+}