You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/12/31 05:01:46 UTC

[pulsar-client-go] branch master updated: [Issue #145] fix producer cannot connect to broker through pulsar proxy (#146)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 12a6a03  [Issue #145] fix producer cannot connect to broker through pulsar proxy (#146)
12a6a03 is described below

commit 12a6a03e9c31fa6892f4219e5850b5d7d298e889
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Dec 31 13:01:38 2019 +0800

    [Issue #145] fix producer cannot connect to broker through pulsar proxy (#146)
    
    if producer connect to broker through pulsar proxy, producer will get Short read when reading frame size error because pulsar proxy not pass commands to broker.
---
 pulsar/consumer_partition.go           |  2 +-
 pulsar/internal/connection.go          | 44 ++++++++++++++++++----------------
 pulsar/internal/connection_pool.go     | 12 ++++++----
 pulsar/internal/lookup_service.go      | 12 ++++++----
 pulsar/internal/lookup_service_test.go |  2 +-
 pulsar/internal/rpc_client.go          |  8 +++----
 pulsar/producer_partition.go           |  4 ++--
 7 files changed, 46 insertions(+), 38 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 55d1318..9d47581 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error {
 	}
 
 	res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
-		pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+		pb.BaseCommand_SUBSCRIBE, cmdSubscribe, lr.ConnectingThroughProxy)
 
 	if err != nil {
 		pc.log.WithError(err).Error("Failed to create consumer")
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7a7cd5f..590b92f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -119,9 +119,10 @@ type connection struct {
 	state             connectionState
 	connectionTimeout time.Duration
 
-	logicalAddr  *url.URL
-	physicalAddr *url.URL
-	cnx          net.Conn
+	logicalAddr            *url.URL
+	physicalAddr           *url.URL
+	connectingThroughProxy bool
+	cnx                    net.Conn
 
 	writeBufferLock sync.Mutex
 	writeBuffer     Buffer
@@ -152,20 +153,21 @@ type connection struct {
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
-	connectionTimeout time.Duration, auth auth.Provider) *connection {
+	connectionTimeout time.Duration, auth auth.Provider, connectingThroughProxy bool) *connection {
 	cnx := &connection{
-		state:                connectionInit,
-		connectionTimeout:    connectionTimeout,
-		logicalAddr:          logicalAddr,
-		physicalAddr:         physicalAddr,
-		writeBuffer:          NewBuffer(4096),
-		log:                  log.WithField("remote_addr", physicalAddr),
-		pendingReqs:          make(map[uint64]*request),
-		lastDataReceivedTime: time.Now(),
-		pingTicker:           time.NewTicker(keepAliveInterval),
-		pingCheckTicker:      time.NewTicker(keepAliveInterval),
-		tlsOptions:           tlsOptions,
-		auth:                 auth,
+		state:                  connectionInit,
+		connectionTimeout:      connectionTimeout,
+		logicalAddr:            logicalAddr,
+		physicalAddr:           physicalAddr,
+		connectingThroughProxy: connectingThroughProxy,
+		writeBuffer:            NewBuffer(4096),
+		log:                    log.WithField("remote_addr", physicalAddr),
+		pendingReqs:            make(map[uint64]*request),
+		lastDataReceivedTime:   time.Now(),
+		pingTicker:             time.NewTicker(keepAliveInterval),
+		pingCheckTicker:        time.NewTicker(keepAliveInterval),
+		tlsOptions:             tlsOptions,
+		auth:                   auth,
 
 		closeCh:            make(chan interface{}),
 		incomingRequestsCh: make(chan *request, 10),
@@ -248,14 +250,16 @@ func (c *connection) doHandshake() bool {
 	// During the initial handshake, the internal keep alive is not
 	// active yet, so we need to timeout write and read requests
 	c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
-
-	c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
+	cmdConnect := &pb.CommandConnect{
 		ProtocolVersion: &version,
 		ClientVersion:   proto.String("Pulsar Go 0.1"),
 		AuthMethodName:  proto.String(c.auth.Name()),
 		AuthData:        authData,
-	}))
-
+	}
+	if c.connectingThroughProxy {
+		cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
+	}
+	c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
 	cmd, _, err := c.reader.readSingleCommand()
 	if err != nil {
 		c.log.WithError(err).Warn("Failed to perform initial handshake")
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 14f7753..efb5cf1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,6 +18,7 @@
 package internal
 
 import (
+	"fmt"
 	"net/url"
 	"sync"
 	"time"
@@ -30,7 +31,7 @@ import (
 // ConnectionPool is a interface of connection pool.
 type ConnectionPool interface {
 	// GetConnection get a connection from ConnectionPool.
-	GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
+	GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, connectingThroughProxy bool) (Connection, error)
 
 	// Close all the connections in the pool
 	Close()
@@ -52,8 +53,9 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTim
 	}
 }
 
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
-	cachedCnx, found := p.pool.Load(logicalAddr.Host)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL,
+	connectingThroughProxy bool) (Connection, error) {
+	cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy))
 	if found {
 		cnx := cachedCnx.(*connection)
 		log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
@@ -68,8 +70,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
 	}
 
 	// Try to create a new connection
-	newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
-		newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth))
+	newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy),
+		newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth, connectingThroughProxy))
 	cnx := newCnx.(*connection)
 	if !wasCached {
 		cnx.start()
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index ee6dffe..e056ea5 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -30,8 +30,9 @@ import (
 
 // LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
-	LogicalAddr  *url.URL
-	PhysicalAddr *url.URL
+	LogicalAddr            *url.URL
+	PhysicalAddr           *url.URL
+	ConnectingThroughProxy bool
 }
 
 // LookupService is a interface of lookup service.
@@ -105,7 +106,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
 				RequestId:     &id,
 				Topic:         &topic,
 				Authoritative: lr.Authoritative,
-			})
+			}, false)
 			if err != nil {
 				return nil, err
 			}
@@ -123,8 +124,9 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
 			}
 
 			return &LookupResult{
-				LogicalAddr:  logicalAddress,
-				PhysicalAddr: physicalAddress,
+				LogicalAddr:            logicalAddress,
+				PhysicalAddr:           physicalAddress,
+				ConnectingThroughProxy: lr.GetProxyThroughServiceUrl(),
 			}, nil
 
 		case pb.CommandLookupTopicResponse_Failed:
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index 5bb1724..cce6198 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCo
 }
 
 func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
-	cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+	cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) {
 	assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
 	expectedRequest := &c.expectedRequests[0]
 	c.expectedRequests = c.expectedRequests[1:]
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 23c3b61..3b5e622 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,7 +45,7 @@ type RPCClient interface {
 	RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
 
 	Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
-		cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
+		cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error)
 
 	RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)
 
@@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout time.
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
 	message proto.Message) (*RPCResult, error) {
-	return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
+	return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message, false)
 }
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
-	cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+	cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) {
 	// TODO: Add retry logic in case of connection issues
-	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+	cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, connectingThroughProxy)
 	if err != nil {
 		return nil, err
 	}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f96a706..0963570 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,8 +140,8 @@ func (p *partitionProducer) grabCnx() error {
 	if len(p.options.Properties) > 0 {
 		cmdProducer.Metadata = toKeyValues(p.options.Properties)
 	}
-
-	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
+	res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer,
+		lr.ConnectingThroughProxy)
 	if err != nil {
 		p.log.WithError(err).Error("Failed to create producer")
 		return err