You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:44 UTC
[pulsar-client-go] 07/38: Removed internalClose method
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit d8a1dccf72b6ebfa47d1f12b129096eb42418d6a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Mar 30 11:41:13 2019 -0700
Removed internalClose method
---
pulsar/impl/connection.go | 34 ++++++++++++++++------------------
pulsar/impl/connection_reader.go | 8 ++++----
2 files changed, 20 insertions(+), 22 deletions(-)
diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 2f09690..365f1b3 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -95,7 +95,7 @@ func (c *connection) connect() (ok bool) {
c.cnx, err = net.Dial("tcp", c.physicalAddr)
if err != nil {
c.log.WithError(err).Warn("Failed to connect to broker.")
- c.internalClose()
+ c.Close()
return false
} else {
c.log = c.log.WithField("laddr", c.cnx.LocalAddr())
@@ -193,7 +193,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
if _, err := c.cnx.Write(c.writeBuffer.ReadableSlice()); err != nil {
c.log.WithError(err).Warn("Failed to write on connection")
- c.internalClose()
+ c.Close()
}
}
@@ -271,7 +271,7 @@ func (c *connection) sendPing() {
// We have not received a response to the previous Ping request, the
// connection to broker is stale
c.log.Info("Detected stale connection to broker")
- c.internalClose()
+ c.Close()
return
}
@@ -288,21 +288,6 @@ func (c *connection) handlePing() {
}
func (c *connection) Close() {
- // TODO
-}
-
-func (c *connection) changeState(state connectionState) {
- c.Lock()
- c.state = state
- c.cond.Broadcast()
- c.Unlock()
-}
-
-func (c *connection) newRequestId() uint64 {
- return atomic.AddUint64(&c.requestIdGenerator, 1)
-}
-
-func (c *connection) internalClose() {
c.Lock()
defer c.Unlock()
@@ -313,6 +298,19 @@ func (c *connection) internalClose() {
c.log.Info("Connection closed")
c.cnx.Close()
c.pingTicker.Stop()
+ close(c.incomingRequests)
c.cnx = nil
}
}
+
+func (c *connection) changeState(state connectionState) {
+ c.Lock()
+ c.state = state
+ c.cond.Broadcast()
+ c.Unlock()
+}
+
+func (c *connection) newRequestId() uint64 {
+ return atomic.AddUint64(&c.requestIdGenerator, 1)
+}
+
diff --git a/pulsar/impl/connection_reader.go b/pulsar/impl/connection_reader.go
index 5bb87c6..3b45dc4 100644
--- a/pulsar/impl/connection_reader.go
+++ b/pulsar/impl/connection_reader.go
@@ -27,7 +27,7 @@ func (r *connectionReader) readFromConnection() {
cmd, headersAndPayload, err := r.readSingleCommand()
if err != nil {
r.cnx.log.WithError(err).Info("Error reading from connection")
- r.cnx.internalClose()
+ r.cnx.Close()
break
}
@@ -53,7 +53,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
frameSize := r.buffer.ReadUint32()
if frameSize > MaxFrameSize {
r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
- r.cnx.internalClose()
+ r.cnx.Close()
return nil, nil, errors.New("Frame size too big")
}
@@ -96,7 +96,7 @@ func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
if err != nil {
- r.cnx.internalClose()
+ r.cnx.Close()
return false
}
@@ -109,7 +109,7 @@ func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error)
err := proto.Unmarshal(data, cmd)
if err != nil {
r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
- r.cnx.internalClose()
+ r.cnx.Close()
return nil, err
} else {
return cmd, nil