You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2019/11/03 05:47:08 UTC
[pulsar-client-go] branch master updated: Handle RPC errors in
reconnections (#83)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 77dccea Handle RPC errors in reconnections (#83)
77dccea is described below
commit 77dccea9cb9f33a0975d7fda42eb44fa60031fb3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Nov 2 22:47:00 2019 -0700
Handle RPC errors in reconnections (#83)
### Motivation
We're not currently handling the ServerError responses in the RpcClient. That leaves the caller hanging when any request fails on broker side.
---
pulsar/impl_partition_consumer.go | 14 ++++++--------
pulsar/impl_partition_producer.go | 11 +++++------
pulsar/internal/connection.go | 39 +++++++++++++++++++++++++++------------
pulsar/internal/rpc_client.go | 15 ++++++++++-----
4 files changed, 48 insertions(+), 31 deletions(-)
diff --git a/pulsar/impl_partition_consumer.go b/pulsar/impl_partition_consumer.go
index 5ebbb06..2d8800d 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -693,24 +693,22 @@ func (pc *partitionConsumer) ConnectionClosed() {
}
func (pc *partitionConsumer) reconnectToBroker() {
- pc.log.Info("Reconnecting to broker")
- backoff := new(internal.Backoff)
+ backoff := internal.Backoff{}
for {
if pc.state != consumerReady {
// Consumer is already closing
return
}
+ d := backoff.Next()
+ pc.log.Info("Reconnecting to broker in ", d)
+ time.Sleep(d)
+
err := pc.grabCnx()
if err == nil {
// Successfully reconnected
- pc.log.Info("Successfully reconnected")
+ pc.log.Info("Reconnected consumer to broker")
return
}
-
- d := backoff.Next()
- pc.log.Info("Retrying reconnection after ", d)
-
- time.Sleep(d)
}
}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 196dbbc..56fbb6c 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -174,7 +174,6 @@ func (p *partitionProducer) ConnectionClosed() {
}
func (p *partitionProducer) reconnectToBroker() {
- p.log.Info("Reconnecting to broker")
backoff := internal.Backoff{}
for {
if p.state != producerReady {
@@ -182,16 +181,16 @@ func (p *partitionProducer) reconnectToBroker() {
return
}
+ d := backoff.Next()
+ p.log.Info("Reconnecting to broker in ", d)
+ time.Sleep(d)
+
err := p.grabCnx()
if err == nil {
// Successfully reconnected
+ p.log.Info("Reconnected producer to broker")
return
}
-
- d := backoff.Next()
- p.log.Info("Retrying reconnection after ", d)
-
- time.Sleep(d)
}
}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 592beca..2e933b8 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -21,6 +21,7 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
+ "fmt"
"io/ioutil"
"net"
"net/url"
@@ -55,7 +56,7 @@ type ConnectionListener interface {
// Connection is a interface of client cnx.
type Connection interface {
- SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
+ SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
WriteData(data []byte)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
@@ -103,7 +104,7 @@ const keepAliveInterval = 30 * time.Second
type request struct {
id uint64
cmd *pb.BaseCommand
- callback func(command *pb.BaseCommand)
+ callback func(command *pb.BaseCommand, err error)
}
type connection struct {
@@ -116,8 +117,8 @@ type connection struct {
cnx net.Conn
writeBufferLock sync.Mutex
- writeBuffer Buffer
- reader *connectionReader
+ writeBuffer Buffer
+ reader *connectionReader
lastDataReceivedLock sync.Mutex
lastDataReceivedTime time.Time
@@ -135,7 +136,7 @@ type connection struct {
listeners map[uint64]ConnectionListener
consumerHandlersLock sync.RWMutex
- consumerHandlers map[uint64]ConsumerHandler
+ consumerHandlers map[uint64]ConsumerHandler
tlsOptions *TLSOptions
auth auth.Provider
@@ -362,11 +363,8 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload []by
c.handleResponse(cmd.GetSchemaResponse.GetRequestId(), cmd)
case pb.BaseCommand_ERROR:
- if cmd.Error != nil {
- c.log.Errorf("Error: %s, Error Message: %s", cmd.Error.GetError(), cmd.Error.GetMessage())
- c.Close()
- return
- }
+ c.handleResponseError(cmd.GetError())
+
case pb.BaseCommand_CLOSE_PRODUCER:
c.handleCloseProducer(cmd.GetCloseProducer())
case pb.BaseCommand_CLOSE_CONSUMER:
@@ -399,7 +397,7 @@ func (c *connection) Write(data []byte) {
c.writeRequestsCh <- data
}
-func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand)) {
+func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand, err error)) {
c.incomingRequestsCh <- &request{
id: requestID,
cmd: req,
@@ -424,7 +422,24 @@ func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand)
delete(c.pendingReqs, requestID)
c.mapMutex.RUnlock()
- request.callback(response)
+ request.callback(response, nil)
+}
+
+func (c *connection) handleResponseError(serverError *pb.CommandError) {
+ requestID := serverError.GetRequestId()
+ c.mapMutex.RLock()
+ request, ok := c.pendingReqs[requestID]
+ if !ok {
+ c.log.Warnf("Received unexpected error response for request %d of type %s",
+ requestID, serverError.GetError())
+ return
+ }
+
+ delete(c.pendingReqs, requestID)
+ c.mapMutex.RUnlock()
+
+ request.callback(nil,
+ errors.New(fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage())))
}
func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 66e8a79..35d8b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -84,14 +84,17 @@ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, request
Cnx: cnx,
}
+ var rpcErr error = nil
+
// TODO: Handle errors with disconnections
- cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+ cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
rpcResult.Response = response
+ rpcErr = err
wg.Done()
})
wg.Wait()
- return rpcResult, nil
+ return rpcResult, rpcErr
}
func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
@@ -103,13 +106,15 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba
Cnx: cnx,
}
- cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+ var rpcErr error = nil
+ cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
rpcResult.Response = response
+ rpcErr = err
wg.Done()
})
wg.Wait()
- return rpcResult, nil
+ return rpcResult, rpcErr
}
func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type,
@@ -118,7 +123,7 @@ func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, requestID uint64, cmdType
Cnx: cnx,
}
- cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand) {
+ cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response *pb.BaseCommand, err error) {
rpcResult.Response = response
})