You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:44 UTC

[pulsar-client-go] 07/38: Removed internalClose method

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit d8a1dccf72b6ebfa47d1f12b129096eb42418d6a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Mar 30 11:41:13 2019 -0700

    Removed internalClose method
---
 pulsar/impl/connection.go        | 34 ++++++++++++++++------------------
 pulsar/impl/connection_reader.go |  8 ++++----
 2 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/pulsar/impl/connection.go b/pulsar/impl/connection.go
index 2f09690..365f1b3 100644
--- a/pulsar/impl/connection.go
+++ b/pulsar/impl/connection.go
@@ -95,7 +95,7 @@ func (c *connection) connect() (ok bool) {
 	c.cnx, err = net.Dial("tcp", c.physicalAddr)
 	if err != nil {
 		c.log.WithError(err).Warn("Failed to connect to broker.")
-		c.internalClose()
+		c.Close()
 		return false
 	} else {
 		c.log = c.log.WithField("laddr", c.cnx.LocalAddr())
@@ -193,7 +193,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
 
 	if _, err := c.cnx.Write(c.writeBuffer.ReadableSlice()); err != nil {
 		c.log.WithError(err).Warn("Failed to write on connection")
-		c.internalClose()
+		c.Close()
 	}
 }
 
@@ -271,7 +271,7 @@ func (c *connection) sendPing() {
 		// We have not received a response to the previous Ping request, the
 		// connection to broker is stale
 		c.log.Info("Detected stale connection to broker")
-		c.internalClose()
+		c.Close()
 		return
 	}
 
@@ -288,21 +288,6 @@ func (c *connection) handlePing() {
 }
 
 func (c *connection) Close() {
-	// TODO
-}
-
-func (c *connection) changeState(state connectionState) {
-	c.Lock()
-	c.state = state
-	c.cond.Broadcast()
-	c.Unlock()
-}
-
-func (c *connection) newRequestId() uint64 {
-	return atomic.AddUint64(&c.requestIdGenerator, 1)
-}
-
-func (c *connection) internalClose() {
 	c.Lock()
 	defer c.Unlock()
 
@@ -313,6 +298,19 @@ func (c *connection) internalClose() {
 		c.log.Info("Connection closed")
 		c.cnx.Close()
 		c.pingTicker.Stop()
+		close(c.incomingRequests)
 		c.cnx = nil
 	}
 }
+
+func (c *connection) changeState(state connectionState) {
+	c.Lock()
+	c.state = state
+	c.cond.Broadcast()
+	c.Unlock()
+}
+
+func (c *connection) newRequestId() uint64 {
+	return atomic.AddUint64(&c.requestIdGenerator, 1)
+}
+
diff --git a/pulsar/impl/connection_reader.go b/pulsar/impl/connection_reader.go
index 5bb87c6..3b45dc4 100644
--- a/pulsar/impl/connection_reader.go
+++ b/pulsar/impl/connection_reader.go
@@ -27,7 +27,7 @@ func (r *connectionReader) readFromConnection() {
 		cmd, headersAndPayload, err := r.readSingleCommand()
 		if err != nil {
 			r.cnx.log.WithError(err).Info("Error reading from connection")
-			r.cnx.internalClose()
+			r.cnx.Close()
 			break
 		}
 
@@ -53,7 +53,7 @@ func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndP
 	frameSize := r.buffer.ReadUint32()
 	if frameSize > MaxFrameSize {
 		r.cnx.log.Warnf("Received too big frame size. size=%d", frameSize)
-		r.cnx.internalClose()
+		r.cnx.Close()
 		return nil, nil, errors.New("Frame size too big")
 	}
 
@@ -96,7 +96,7 @@ func (r *connectionReader) readAtLeast(size uint32) (ok bool) {
 
 	n, err := io.ReadAtLeast(r.cnx.cnx, r.buffer.WritableSlice(), int(size))
 	if err != nil {
-		r.cnx.internalClose()
+		r.cnx.Close()
 		return false
 	}
 
@@ -109,7 +109,7 @@ func (r *connectionReader) deserializeCmd(data []byte) (*pb.BaseCommand, error)
 	err := proto.Unmarshal(data, cmd)
 	if err != nil {
 		r.cnx.log.WithError(err).Warn("Failed to parse protobuf command")
-		r.cnx.internalClose()
+		r.cnx.Close()
 		return nil, err
 	} else {
 		return cmd, nil