You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/11/03 05:47:08 UTC

[pulsar-client-go] branch master updated: Handle RPC errors in reconnections (#83)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 77dccea  Handle RPC errors in reconnections (#83)
77dccea is described below

commit 77dccea9cb9f33a0975d7fda42eb44fa60031fb3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Nov 2 22:47:00 2019 -0700

    Handle RPC errors in reconnections (#83)
    
    ### Motivation
    
    We're not currently handling the ServerError responses in the RpcClient. That leaves the caller hanging when any request fails on broker side.
---
 pulsar/impl_partition_consumer.go | 14 ++++++--------
 pulsar/impl_partition_producer.go | 11 +++++------
 pulsar/internal/connection.go     | 39 +++++++++++++++++++++++++++------------
 pulsar/internal/rpc_client.go     | 15 ++++++++++-----
 4 files changed, 48 insertions(+), 31 deletions(-)

diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 5ebbb06..2d8800d 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -693,24 +693,22 @@ func (pc *partitionConsumer) ConnectionClosed() {
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
-	pc.log.Info("Reconnecting to broker")
-	backoff := new(internal.Backoff)
+	backoff := internal.Backoff{}
 	for {
 		if pc.state != consumerReady {
 			// Consumer is already closing
 			return
 		}
 
+		d := backoff.Next()
+		pc.log.Info("Reconnecting to broker in ", d)
+		time.Sleep(d)
+
 		err := pc.grabCnx()
 		if err == nil {
 			// Successfully reconnected
-			pc.log.Info("Successfully reconnected")
+			pc.log.Info("Reconnected consumer to broker")
 			return
 		}
-
-		d := backoff.Next()
-		pc.log.Info("Retrying reconnection after ", d)
-
-		time.Sleep(d)
 	}
 }
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 196dbbc..56fbb6c 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -174,7 +174,6 @@ func (p *partitionProducer) ConnectionClosed() {
 }
 
 func (p *partitionProducer) reconnectToBroker() {
-	p.log.Info("Reconnecting to broker")
 	backoff := internal.Backoff{}
 	for {
 		if p.state != producerReady {
@@ -182,16 +181,16 @@ func (p *partitionProducer) reconnectToBroker() {
 			return
 		}
 
+		d := backoff.Next()
+		p.log.Info("Reconnecting to broker in ", d)
+		time.Sleep(d)
+
 		err := p.grabCnx()
 		if err == nil {
 			// Successfully reconnected
+			p.log.Info("Reconnected producer to broker")
 			return
 		}
-
-		d := backoff.Next()
-		p.log.Info("Retrying reconnection after ", d)
-
-		time.Sleep(d)
 	}
 }
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 592beca..2e933b8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -21,6 +21,7 @@ import (
 	"crypto/tls"
 	"crypto/x509"
 	"errors"
+	"fmt"
 	"io/ioutil"
 	"net"
 	"net/url"
@@ -55,7 +56,7 @@ type ConnectionListener interface {
 
 // Connection is a interface of client cnx.
 type Connection interface {
-	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
+	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
 	WriteData(data []byte)
 	RegisterListener(id uint64, listener ConnectionListener)
 	UnregisterListener(id uint64)
@@ -103,7 +104,7 @@ const keepAliveInterval = 30 * time.Second
 type request struct {
 	id       uint64
 	cmd      *pb.BaseCommand
-	callback func(command *pb.BaseCommand)
+	callback func(command *pb.BaseCommand, err error)
 }
 
 type connection struct {
@@ -116,8 +117,8 @@ type connection struct {
 	cnx          net.Conn
 
 	writeBufferLock sync.Mutex
-	writeBuffer          Buffer
-	reader               *connectionReader
+	writeBuffer     Buffer
+	reader          *connectionReader
 
 	lastDataReceivedLock sync.Mutex
 	lastDataReceivedTime time.Time
@@ -135,7 +136,7 @@ type connection struct {
 	listeners   map[uint64]ConnectionListener
 
 	consumerHandlersLock sync.RWMutex
-	consumerHandlers map[uint64]ConsumerHandler
+	consumerHandlers     map[uint64]ConsumerHandler
 
 	tlsOptions *TLSOptions
 	auth       auth.Provider
@@ -362,11 +363,8 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
 		c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
 
 	case pb.BaseCommand_ERROR:
-		if cmd.Error != nil {
-			c.log.Errorf("Error: %s, Error Message: %s", cmd.Error.GetError(), cmd.Error.GetMessage())
-			c.Close()
-			return
-		}
+		c.handleResponseError(cmd.GetError())
+
 	case pb.BaseCommand_CLOSE_PRODUCER:
 		c.handleCloseProducer(cmd.GetCloseProducer())
 	case pb.BaseCommand_CLOSE_CONSUMER:
@@ -399,7 +397,7 @@ func (c *connection) Write(data []byte) {
 	c.writeRequestsCh <- data
 }
 
-func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand)) {
+func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) {
 	c.incomingRequestsCh <- &request{
 		id:       requestID,
 		cmd:      req,
@@ -424,7 +422,24 @@ func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand)
 
 	delete(c.pendingReqs, requestID)
 	c.mapMutex.RUnlock()
-	request.callback(response)
+	request.callback(response, nil)
+}
+
+func (c *connection) handleResponseError(serverError *pb.CommandError) {
+	requestID := serverError.GetRequestId()
+	c.mapMutex.RLock()
+	request, ok := c.pendingReqs[requestID]
+	if !ok {
+		c.log.Warnf("Received unexpected error response for request %d of type %s",
+			requestID, serverError.GetError())
+		return
+	}
+
+	delete(c.pendingReqs, requestID)
+	c.mapMutex.RUnlock()
+
+	request.callback(nil,
+		errors.New(fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage())))
 }
 
 func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 66e8a79..35d8b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -84,14 +84,17 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
 		Cnx: cnx,
 	}
 
+	var rpcErr error = nil
+
 	// TODO: Handle errors with disconnections
-	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
 		rpcResult.Response = response
+		rpcErr = err
 		wg.Done()
 	})
 
 	wg.Wait()
-	return rpcResult, nil
+	return rpcResult, rpcErr
 }
 
 func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
@@ -103,13 +106,15 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba
 		Cnx: cnx,
 	}
 
-	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+	var rpcErr error = nil
+	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
 		rpcResult.Response = response
+		rpcErr = err
 		wg.Done()
 	})
 
 	wg.Wait()
-	return rpcResult, nil
+	return rpcResult, rpcErr
 }
 
 func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
@@ -118,7 +123,7 @@ func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType
 		Cnx: cnx,
 	}
 
-	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+	cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
 		rpcResult.Response = response
 	})