You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:47 UTC

[pulsar-client-go] 10/38: Reconnection logic

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 2e74112c2e6162bfdf702ed03af2550d0d3ddd49
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Apr 6 08:35:50 2019 -0700

    Reconnection logic
---
 pulsar/impl/backoff.go            | 24 ++++++++++++++++++++
 pulsar/impl/batch_builder.go      |  2 +-
 pulsar/impl/connection.go         | 48 ++++++++++++++++++++++++++++++++-------
 pulsar/impl/connection_pool.go    |  3 ++-
 pulsar/impl_partition_producer.go | 28 +++++++++++++++++++++++
 5 files changed, 95 insertions(+), 10 deletions(-)

diff --git a/pulsar/impl/backoff.go b/pulsar/impl/backoff.go
new file mode 100644
index 0000000..451509f
--- /dev/null
+++ b/pulsar/impl/backoff.go
@@ -0,0 +1,24 @@
+package impl
+
+import (
+	"time"
+)
+
+type Backoff struct {
+	backoff time.Duration
+}
+
+const minBackoff = 100 * time.Millisecond
+const maxBackoff = 60 * time.Second
+
+func (b *Backoff) Next() time.Duration {
+	// Double the delay each time
+	b.backoff += b.backoff
+	if b.backoff.Nanoseconds() < minBackoff.Nanoseconds() {
+		b.backoff = minBackoff
+	} else if b.backoff.Nanoseconds() > maxBackoff.Nanoseconds() {
+		b.backoff = maxBackoff
+	}
+
+	return b.backoff
+}
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 6b0df8e..53bf081 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -84,7 +84,7 @@ func (bb *BatchBuilder) reset() {
 }
 
 func (bb *BatchBuilder) Flush() []byte {
-	log.Info("BatchBuilder flush: messages: ", bb.numMessages)
+	log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
 	if bb.numMessages == 0 {
 		// No-Op for empty batch
 		return nil
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 04fa10f..18a3b63 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -12,9 +12,17 @@ import (
 	"time"
 )
 
+// ConnectionListener is a user of a connection (eg. a producer or
+// a consumer) that can register itself to get notified
+// when the connection is closed.
+type ConnectionListener interface {
+	ConnectionClosed()
+}
+
 type Connection interface {
 	SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
 	WriteData(data []byte)
+	RegisterListener(listener ConnectionListener)
 	Close()
 }
 
@@ -57,6 +65,7 @@ type connection struct {
 	incomingRequests chan *request
 	writeRequests    chan []byte
 	pendingReqs      map[uint64]*request
+	listeners        []ConnectionListener
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
@@ -72,6 +81,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
 
 		incomingRequests: make(chan *request),
 		writeRequests:    make(chan []byte),
+		listeners:        make([]ConnectionListener, 0),
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -142,13 +152,15 @@ func (c *connection) waitUntilReady() error {
 	defer c.Unlock()
 
 	for {
+		c.log.Debug("Wait until connection is ready. State: ", c.state)
 		switch c.state {
 		case connectionInit:
+			fallthrough
 		case connectionConnecting:
+			fallthrough
 		case connectionTcpConnected:
 			// Wait for the state to change
 			c.cond.Wait()
-			break
 
 		case connectionReady:
 			return nil
@@ -166,10 +178,16 @@ func (c *connection) run() {
 	for {
 		select {
 		case req := <-c.incomingRequests:
+			if req == nil {
+				return
+			}
 			c.pendingReqs[req.id] = req
 			c.writeCommand(req.cmd)
 
 		case data := <-c.writeRequests:
+			if data == nil {
+				return
+			}
 			c.internalWriteData(data)
 
 		case _ = <-c.pingTicker.C:
@@ -183,7 +201,7 @@ func (c *connection) WriteData(data []byte) {
 }
 
 func (c *connection) internalWriteData(data []byte) {
-	c.log.Info("Write data: ", len(data))
+	c.log.Debug("Write data: ", len(data))
 	if _, err := c.cnx.Write(data); err != nil {
 		c.log.WithError(err).Warn("Failed to write on connection")
 		c.Close()
@@ -313,20 +331,34 @@ func (c *connection) handlePing() {
 	c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
 }
 
+func (c *connection) RegisterListener(listener ConnectionListener) {
+	c.Lock()
+	defer c.Unlock()
+
+	c.listeners = append(c.listeners, listener)
+}
+
 func (c *connection) Close() {
 	c.Lock()
 	defer c.Unlock()
 
-	c.state = connectionClosed
 	c.cond.Broadcast()
 
+	if c.state == connectionClosed {
+		return
+	}
+
+	c.log.Info("Connection closed")
+	c.state = connectionClosed
 	if c.cnx != nil {
-		c.log.Info("Connection closed")
 		c.cnx.Close()
-		c.pingTicker.Stop()
-		close(c.incomingRequests)
-		close(c.writeRequests)
-		c.cnx = nil
+	}
+	c.pingTicker.Stop()
+	close(c.incomingRequests)
+	close(c.writeRequests)
+	//c.cnx = nil
+	for _, listener := range c.listeners {
+		listener.ConnectionClosed()
 	}
 }
 
diff --git a/pulsar/impl/connection_pool.go b/pulsar/impl/connection_pool.go
index 1fa647f..ef5ca8b 100644
--- a/pulsar/impl/connection_pool.go
+++ b/pulsar/impl/connection_pool.go
@@ -32,7 +32,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
 			return cnx, nil
 		} else {
 			// The cached connection is failed
-			p.pool.Delete(logicalAddr)
+			p.pool.Delete(logicalAddr.Host)
+			log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
 		}
 	}
 
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 2677770..60ae76c 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -99,10 +99,36 @@ func (p *partitionProducer) grabCnx() error {
 		p.sequenceIdGenerator = &nextSequenceId
 	}
 	p.cnx = res.Cnx
+	p.cnx.RegisterListener(p)
 	p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
 	return nil
 }
 
+type connectionClosed struct {
+}
+
+func (p *partitionProducer) ConnectionClosed() {
+	// Trigger reconnection in the produce goroutine
+	p.eventsChan <- &connectionClosed{}
+}
+
+func (p *partitionProducer) reconnectToBroker() {
+	backoff := impl.Backoff{}
+	for {
+		err := p.grabCnx()
+		if err == nil {
+			// Successfully reconnected
+			return
+		}
+
+		d := backoff.Next()
+		p.log.Info("Retrying reconnection after ", d)
+
+		time.Sleep(d)
+	}
+}
+
+
 func (p *partitionProducer) run() {
 	for {
 		select {
@@ -110,6 +136,8 @@ func (p *partitionProducer) run() {
 			switch v := i.(type) {
 			case *sendRequest:
 				p.internalSend(v)
+			case *connectionClosed:
+				p.reconnectToBroker()
 			}
 
 		case _ = <-p.batchFlushTicker.C: