You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:47 UTC
[pulsar-client-go] 10/38: Reconnection logic
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 2e74112c2e6162bfdf702ed03af2550d0d3ddd49
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Apr 6 08:35:50 2019 -0700
Reconnection logic
---
pulsar/impl/backoff.go | 24 ++++++++++++++++++++
pulsar/impl/batch_builder.go | 2 +-
pulsar/impl/connection.go | 48 ++++++++++++++++++++++++++++++++-------
pulsar/impl/connection_pool.go | 3 ++-
pulsar/impl_partition_producer.go | 28 +++++++++++++++++++++++
5 files changed, 95 insertions(+), 10 deletions(-)
diff --git a/pulsar/impl/backoff.go b/pulsar/impl/backoff.go
new file mode 100644
index 0000000..451509f
--- /dev/null
+++ b/pulsar/impl/backoff.go
@@ -0,0 +1,24 @@
+package impl
+
+import (
+ "time"
+)
+
+type Backoff struct {
+ backoff time.Duration
+}
+
+const minBackoff = 100 * time.Millisecond
+const maxBackoff = 60 * time.Second
+
+func (b *Backoff) Next() time.Duration {
+ // Double the delay each time
+ b.backoff += b.backoff
+ if b.backoff.Nanoseconds() < minBackoff.Nanoseconds() {
+ b.backoff = minBackoff
+ } else if b.backoff.Nanoseconds() > maxBackoff.Nanoseconds() {
+ b.backoff = maxBackoff
+ }
+
+ return b.backoff
+}
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index 6b0df8e..53bf081 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -84,7 +84,7 @@ func (bb *BatchBuilder) reset() {
}
func (bb *BatchBuilder) Flush() []byte {
- log.Info("BatchBuilder flush: messages: ", bb.numMessages)
+ log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
if bb.numMessages == 0 {
// No-Op for empty batch
return nil
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 04fa10f..18a3b63 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -12,9 +12,17 @@ import (
"time"
)
+// ConnectionListener is a user of a connection (eg. a producer or
+// a consumer) that can register itself to get notified
+// when the connection is closed.
+type ConnectionListener interface {
+ ConnectionClosed()
+}
+
type Connection interface {
SendRequest(requestId uint64, req *pb.BaseCommand, callback func(command *pb.BaseCommand))
WriteData(data []byte)
+ RegisterListener(listener ConnectionListener)
Close()
}
@@ -57,6 +65,7 @@ type connection struct {
incomingRequests chan *request
writeRequests chan []byte
pendingReqs map[uint64]*request
+ listeners []ConnectionListener
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
@@ -72,6 +81,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL) *connection {
incomingRequests: make(chan *request),
writeRequests: make(chan []byte),
+ listeners: make([]ConnectionListener, 0),
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -142,13 +152,15 @@ func (c *connection) waitUntilReady() error {
defer c.Unlock()
for {
+ c.log.Debug("Wait until connection is ready. State: ", c.state)
switch c.state {
case connectionInit:
+ fallthrough
case connectionConnecting:
+ fallthrough
case connectionTcpConnected:
// Wait for the state to change
c.cond.Wait()
- break
case connectionReady:
return nil
@@ -166,10 +178,16 @@ func (c *connection) run() {
for {
select {
case req := <-c.incomingRequests:
+ if req == nil {
+ return
+ }
c.pendingReqs[req.id] = req
c.writeCommand(req.cmd)
case data := <-c.writeRequests:
+ if data == nil {
+ return
+ }
c.internalWriteData(data)
case _ = <-c.pingTicker.C:
@@ -183,7 +201,7 @@ func (c *connection) WriteData(data []byte) {
}
func (c *connection) internalWriteData(data []byte) {
- c.log.Info("Write data: ", len(data))
+ c.log.Debug("Write data: ", len(data))
if _, err := c.cnx.Write(data); err != nil {
c.log.WithError(err).Warn("Failed to write on connection")
c.Close()
@@ -313,20 +331,34 @@ func (c *connection) handlePing() {
c.writeCommand(baseCommand(pb.BaseCommand_PONG, &pb.CommandPong{}))
}
+func (c *connection) RegisterListener(listener ConnectionListener) {
+ c.Lock()
+ defer c.Unlock()
+
+ c.listeners = append(c.listeners, listener)
+}
+
func (c *connection) Close() {
c.Lock()
defer c.Unlock()
- c.state = connectionClosed
c.cond.Broadcast()
+ if c.state == connectionClosed {
+ return
+ }
+
+ c.log.Info("Connection closed")
+ c.state = connectionClosed
if c.cnx != nil {
- c.log.Info("Connection closed")
c.cnx.Close()
- c.pingTicker.Stop()
- close(c.incomingRequests)
- close(c.writeRequests)
- c.cnx = nil
+ }
+ c.pingTicker.Stop()
+ close(c.incomingRequests)
+ close(c.writeRequests)
+ //c.cnx = nil
+ for _, listener := range c.listeners {
+ listener.ConnectionClosed()
}
}
diff --git a/pulsar/impl/connection_pool.go b/pulsar/impl/connection_pool.go
index 1fa647f..ef5ca8b 100644
--- a/pulsar/impl/connection_pool.go
+++ b/pulsar/impl/connection_pool.go
@@ -32,7 +32,8 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
return cnx, nil
} else {
// The cached connection is failed
- p.pool.Delete(logicalAddr)
+ p.pool.Delete(logicalAddr.Host)
+ log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
}
}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 2677770..60ae76c 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -99,10 +99,36 @@ func (p *partitionProducer) grabCnx() error {
p.sequenceIdGenerator = &nextSequenceId
}
p.cnx = res.Cnx
+ p.cnx.RegisterListener(p)
p.log.WithField("cnx", res.Cnx).Debug("Connected producer")
return nil
}
+type connectionClosed struct {
+}
+
+func (p *partitionProducer) ConnectionClosed() {
+ // Trigger reconnection in the produce goroutine
+ p.eventsChan <- &connectionClosed{}
+}
+
+func (p *partitionProducer) reconnectToBroker() {
+ backoff := impl.Backoff{}
+ for {
+ err := p.grabCnx()
+ if err == nil {
+ // Successfully reconnected
+ return
+ }
+
+ d := backoff.Next()
+ p.log.Info("Retrying reconnection after ", d)
+
+ time.Sleep(d)
+ }
+}
+
+
func (p *partitionProducer) run() {
for {
select {
@@ -110,6 +136,8 @@ func (p *partitionProducer) run() {
switch v := i.(type) {
case *sendRequest:
p.internalSend(v)
+ case *connectionClosed:
+ p.reconnectToBroker()
}
case _ = <-p.batchFlushTicker.C: