You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/30 20:32:07 UTC

[GitHub] [pulsar-client-go] cckellogg opened a new pull request #559: Simplify connection close logic

cckellogg opened a new pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559


   ### Motivation
   
   Simplify the connection close logic into one function. I thinks this makes the code easier to reason about and maintain.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy merged pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#discussion_r661916928



##########
File path: pulsar/internal/rpc_client.go
##########
@@ -88,37 +87,30 @@ func NewRPCClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type,
 	message proto.Message) (*RPCResult, error) {
-	host, err := c.serviceNameResolver.ResolveHost()
-	if err != nil {
-		c.log.Errorf("request host resolve failed with error: {%v}", err)
-		return nil, err
-	}
-	rpcResult, err := c.Request(host, host, requestID, cmdType, message)
-	if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
-		// We can retry this kind of requests over a connection error because they're
-		// not specific to a particular broker.
-		backoff := Backoff{100 * time.Millisecond}
-		startTime := time.Now()
-		var retryTime time.Duration
-
-		for time.Since(startTime) < c.requestTimeout {
-			retryTime = backoff.Next()
-			c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
-			time.Sleep(retryTime)
-			host, err = c.serviceNameResolver.ResolveHost()
-			if err != nil {
-				c.log.Errorf("Retrying request host resolve failed with error: {%v}", err)
-				continue
-			}
-			rpcResult, err = c.Request(host, host, requestID, cmdType, message)
-			if _, ok := err.(net.Error); ok || (err != nil && err.Error() == "connection error") {
-				continue
-			} else {
-				// We either succeeded or encountered a non connection error
-				break
-			}
+	var err error
+	var host *url.URL
+	var rpcResult *RPCResult
+	startTime := time.Now()
+	backoff := Backoff{100 * time.Millisecond}
+	// we can retry these requests because this kind of request is
+	// not specific to any particular broker
+	for time.Since(startTime) < c.requestTimeout {
+		host, err = c.serviceNameResolver.ResolveHost()
+		if err != nil {
+			c.log.WithError(err).Errorf("rpc client failed to resolve host")
+			return nil, err
 		}
+		rpcResult, err = c.Request(host, host, requestID, cmdType, message)
+		// success we got a response
+		if err == nil {
+			break
+		}
+
+		retryTime := backoff.Next()
+		c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
+		time.Sleep(retryTime)
 	}
+

Review comment:
       We could also do something like
   ```
           var err error
   	var host *url.URL
   	var rpcResult *RPCResult
   	backoff := Backoff{100 * time.Millisecond}
   	var timeoutCh <-chan time.Time
   	for {
   		host, err = c.serviceNameResolver.ResolveHost()
   		if err != nil {
   			c.log.WithError(err).Errorf("rpc client failed to resolve host")
   			return nil, err
   		}
   		rpcResult, err = c.Request(host, host, requestID, cmdType, message)
   		// success we got a response
   		if err == nil {
   			break
   		}
   
   		retryTime := backoff.Next()
   		sleepCh := time.After(retryTime)
   		if timeoutCh == nil {
   			timeoutCh = time.After(c.requestTimeout)
   		}
   		c.log.Debugf("Retrying request in {%v} with timeout in {%v}", retryTime, c.requestTimeout)
   		select {
   		case <- sleepCh:
   			break
   		case <- timeoutCh:
   			return rpcResult, err
   		}
   	}
   ```
   
   And we may want to think about adding a context (in another PR) if can we need to cancel this when shutting down producer or consumer on reconnect?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#issuecomment-871972017


   cc / @freeznet @zymap @merlimat 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy edited a comment on pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
wolfstudy edited a comment on pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#issuecomment-871971746


   Hello @cckellogg , In connection start, We first judge whether the TCP connection is established successfully.
   
   ```
   if c.connect(){}
   ```
   
    If the connection is established successfully but the handshake fails, we will branch else, but here we just change the connection status to connectionClosed without directly closing the TCP connection. 
   
   ```
   else {
   	c.metrics.ConnectionsHandshakeErrors.Inc()
   	c.changeState(connectionClosed)
   }
   ```
   
   But in the Close function, when the connection is in the `connectionClosed` state, it returns directly, and does not perform the TCP close operation
   
   ```
   func (c *connection) Close() {
   	if c.getState() == connectionClosed {
   		return
   	}
   ```
   
   
   So here in the reconnect process, it will cause multiple TCP connections to be created locally, resulting in TCP connection resources leakage.
   
   ![image](https://user-images.githubusercontent.com/20965307/124078688-c4f22180-da7a-11eb-8e43-2867d6a74313.png)
   
   Here for the `515` process, only one pulsar Client object is created. In theory, only one TCP connection should be established here, but during the reconnect process, it was found that multiple TCP connections were created locally, and there was a leak of TCP connection resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#issuecomment-871971746


   Hello @cckellogg , In connection start, We first judge whether the TCP connection is established successfully.
   
   ```
   if c.connect(){}
   ```
   
    If the connection is established successfully but the handshake fails, we will branch else, but here we just change the connection status to connectionClosed without directly closing the TCP connection. 
   
   ```
   else {
   	c.metrics.ConnectionsHandshakeErrors.Inc()
   	c.changeState(connectionClosed)
   }
   ```
   
   But in the Close function, when the connection is in the `connectionClosed` state, it returns directly, and does not perform the TCP close operation
   
   ```
   func (c *connection) Close() {
   	c.Lock()
   	defer c.Unlock()
   
   	c.cond.Broadcast()
   
   	if c.getState() == connectionClosed {
   		return
   	}
   ```
   
   
   So here in the reconnect process, it will cause multiple TCP connections to be created locally, resulting in TCP connection resources leakage.
   
   ![image](https://user-images.githubusercontent.com/20965307/124078688-c4f22180-da7a-11eb-8e43-2867d6a74313.png)
   
   Here for the `515` process, only one pulsar Client object is created. In theory, only one TCP connection should be established here, but during the reconnect process, it was found that multiple TCP connections were created locally, and there was a leak of TCP connection resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy edited a comment on pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
wolfstudy edited a comment on pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#issuecomment-871971746


   Hello @cckellogg , In connection start, we first judge whether the TCP connection is established successfully.
   
   ```
   if c.connect(){}
   ```
   
    If the connection is established successfully but the handshake fails, we will branch else, but here we just change the connection status to `connectionClosed` without directly closing the TCP connection. 
   
   ```
   else {
   	c.metrics.ConnectionsHandshakeErrors.Inc()
   	c.changeState(connectionClosed)
   }
   ```
   
   But in the Close function, when the connection is in the `connectionClosed` state, it returns directly and does not perform the TCP close operation
   
   ```
   func (c *connection) Close() {
   	if c.getState() == connectionClosed {
   		return
   	}
   ```
   
   
   So here in the reconnect process, it will cause multiple TCP connections to be created locally, resulting in TCP connection resources leakage.
   
   ![image](https://user-images.githubusercontent.com/20965307/124078688-c4f22180-da7a-11eb-8e43-2867d6a74313.png)
   
   Here for the `515` process, only one pulsar Client object is created. In theory, only one TCP connection should be established here, but during the reconnect process, it was found that multiple TCP connections were created locally, and there was a leak of TCP connection resources.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
cckellogg commented on pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#issuecomment-872387568


   
   
   
   > For example, in https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L222-L224, When the handshake fails, we have created a new connection, but we directly changed the status to connectionClosed but did not actually close the connection
   
   Changed the code to call Close() instead of setting the state to connectionClosed. This should cleanup and close the connection.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #559: Simplify connection close logic

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #559:
URL: https://github.com/apache/pulsar-client-go/pull/559#issuecomment-872083412


   For example, in https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L222-L224, When the handshake fails, we have created a new connection, but we directly changed the status to connectionClosed but did not actually close the connection


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org