You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/12/31 05:01:46 UTC
[pulsar-client-go] branch master updated: [Issue #145] fix producer
cannot connect to broker through pulsar proxy (#146)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 12a6a03 [Issue #145] fix producer cannot connect to broker through pulsar proxy (#146)
12a6a03 is described below
commit 12a6a03e9c31fa6892f4219e5850b5d7d298e889
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Dec 31 13:01:38 2019 +0800
[Issue #145] fix producer cannot connect to broker through pulsar proxy (#146)
if producer connect to broker through pulsar proxy, producer will get Short read when reading frame size error because pulsar proxy not pass commands to broker.
---
pulsar/consumer_partition.go | 2 +-
pulsar/internal/connection.go | 44 ++++++++++++++++++----------------
pulsar/internal/connection_pool.go | 12 ++++++----
pulsar/internal/lookup_service.go | 12 ++++++----
pulsar/internal/lookup_service_test.go | 2 +-
pulsar/internal/rpc_client.go | 8 +++----
pulsar/producer_partition.go | 4 ++--
7 files changed, 46 insertions(+), 38 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 55d1318..9d47581 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -530,7 +530,7 @@ func (pc *partitionConsumer) grabConn() error {
}
res, err := pc.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, requestID,
- pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+ pb.BaseCommand_SUBSCRIBE, cmdSubscribe, lr.ConnectingThroughProxy)
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 7a7cd5f..590b92f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -119,9 +119,10 @@ type connection struct {
state connectionState
connectionTimeout time.Duration
- logicalAddr *url.URL
- physicalAddr *url.URL
- cnx net.Conn
+ logicalAddr *url.URL
+ physicalAddr *url.URL
+ connectingThroughProxy bool
+ cnx net.Conn
writeBufferLock sync.Mutex
writeBuffer Buffer
@@ -152,20 +153,21 @@ type connection struct {
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
- connectionTimeout time.Duration, auth auth.Provider) *connection {
+ connectionTimeout time.Duration, auth auth.Provider, connectingThroughProxy bool) *connection {
cnx := &connection{
- state: connectionInit,
- connectionTimeout: connectionTimeout,
- logicalAddr: logicalAddr,
- physicalAddr: physicalAddr,
- writeBuffer: NewBuffer(4096),
- log: log.WithField("remote_addr", physicalAddr),
- pendingReqs: make(map[uint64]*request),
- lastDataReceivedTime: time.Now(),
- pingTicker: time.NewTicker(keepAliveInterval),
- pingCheckTicker: time.NewTicker(keepAliveInterval),
- tlsOptions: tlsOptions,
- auth: auth,
+ state: connectionInit,
+ connectionTimeout: connectionTimeout,
+ logicalAddr: logicalAddr,
+ physicalAddr: physicalAddr,
+ connectingThroughProxy: connectingThroughProxy,
+ writeBuffer: NewBuffer(4096),
+ log: log.WithField("remote_addr", physicalAddr),
+ pendingReqs: make(map[uint64]*request),
+ lastDataReceivedTime: time.Now(),
+ pingTicker: time.NewTicker(keepAliveInterval),
+ pingCheckTicker: time.NewTicker(keepAliveInterval),
+ tlsOptions: tlsOptions,
+ auth: auth,
closeCh: make(chan interface{}),
incomingRequestsCh: make(chan *request, 10),
@@ -248,14 +250,16 @@ func (c *connection) doHandshake() bool {
// During the initial handshake, the internal keep alive is not
// active yet, so we need to timeout write and read requests
c.cnx.SetDeadline(time.Now().Add(keepAliveInterval))
-
- c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, &pb.CommandConnect{
+ cmdConnect := &pb.CommandConnect{
ProtocolVersion: &version,
ClientVersion: proto.String("Pulsar Go 0.1"),
AuthMethodName: proto.String(c.auth.Name()),
AuthData: authData,
- }))
-
+ }
+ if c.connectingThroughProxy {
+ cmdConnect.ProxyToBrokerUrl = proto.String(c.logicalAddr.Host)
+ }
+ c.writeCommand(baseCommand(pb.BaseCommand_CONNECT, cmdConnect))
cmd, _, err := c.reader.readSingleCommand()
if err != nil {
c.log.WithError(err).Warn("Failed to perform initial handshake")
diff --git a/pulsar/internal/connection_pool.go b/pulsar/internal/connection_pool.go
index 14f7753..efb5cf1 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -18,6 +18,7 @@
package internal
import (
+ "fmt"
"net/url"
"sync"
"time"
@@ -30,7 +31,7 @@ import (
// ConnectionPool is a interface of connection pool.
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
- GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
+ GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, connectingThroughProxy bool) (Connection, error)
// Close all the connections in the pool
Close()
@@ -52,8 +53,9 @@ func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTim
}
}
-func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
- cachedCnx, found := p.pool.Load(logicalAddr.Host)
+func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL,
+ connectingThroughProxy bool) (Connection, error) {
+ cachedCnx, found := p.pool.Load(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy))
if found {
cnx := cachedCnx.(*connection)
log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
@@ -68,8 +70,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
}
// Try to create a new connection
- newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host,
- newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth))
+ newCnx, wasCached := p.pool.LoadOrStore(fmt.Sprintf("%s:%v", logicalAddr.Host, connectingThroughProxy),
+ newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth, connectingThroughProxy))
cnx := newCnx.(*connection)
if !wasCached {
cnx.start()
diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go
index ee6dffe..e056ea5 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -30,8 +30,9 @@ import (
// LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupResult struct {
- LogicalAddr *url.URL
- PhysicalAddr *url.URL
+ LogicalAddr *url.URL
+ PhysicalAddr *url.URL
+ ConnectingThroughProxy bool
}
// LookupService is a interface of lookup service.
@@ -105,7 +106,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
- })
+ }, false)
if err != nil {
return nil, err
}
@@ -123,8 +124,9 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
}
return &LookupResult{
- LogicalAddr: logicalAddress,
- PhysicalAddr: physicalAddress,
+ LogicalAddr: logicalAddress,
+ PhysicalAddr: physicalAddress,
+ ConnectingThroughProxy: lr.GetProxyThroughServiceUrl(),
}, nil
case pb.CommandLookupTopicResponse_Failed:
diff --git a/pulsar/internal/lookup_service_test.go b/pulsar/internal/lookup_service_test.go
index 5bb1724..cce6198 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -70,7 +70,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCo
}
func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+ cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
c.expectedRequests = c.expectedRequests[1:]
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 23c3b61..3b5e622 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -45,7 +45,7 @@ type RPCClient interface {
RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
+ cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message)
@@ -71,13 +71,13 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout time.
func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
- return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message)
+ return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, message, false)
}
func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
- cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+ cmdType pb.BaseCommand_Type, message proto.Message, connectingThroughProxy bool) (*RPCResult, error) {
// TODO: Add retry logic in case of connection issues
- cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr)
+ cnx, err := c.pool.GetConnection(logicalAddr, physicalAddr, connectingThroughProxy)
if err != nil {
return nil, err
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f96a706..0963570 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -140,8 +140,8 @@ func (p *partitionProducer) grabCnx() error {
if len(p.options.Properties) > 0 {
cmdProducer.Metadata = toKeyValues(p.options.Properties)
}
-
- res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer)
+ res, err := p.client.rpcClient.Request(lr.LogicalAddr, lr.PhysicalAddr, id, pb.BaseCommand_PRODUCER, cmdProducer,
+ lr.ConnectingThroughProxy)
if err != nil {
p.log.WithError(err).Error("Failed to create producer")
return err