You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/09/28 03:56:13 UTC

[GitHub] [pulsar-client-go] wolfstudy opened a new pull request #622: Fix logic of command for sendError

wolfstudy opened a new pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622


   
   ### Motivation
   
   
   ![image](https://user-images.githubusercontent.com/20965307/135020293-06cb72cc-5ed9-4bc5-a7ba-3909da57a8a6.png)
   
   
   As shown in the figure above, the `ServerError` returned by the broker is `UnknownError` when the client receives it. In fact, we handled the wrong command here. Here we should deal with `CommandSendError` instead of `CommandError`. Correspondingly, we should deal with the `listener` map used to cache the producer instead of the corresponding `pendingRequest` map.
   
   ### Modifications
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r717645039



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       In CommandSendError, just only `ProducerId` and `SequenceId`
   
   ```
   type CommandSendError struct {
   	ProducerId           *uint64      `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
   	SequenceId           *uint64      `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
   	Error                *ServerError `protobuf:"varint,3,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
   	Message              *string      `protobuf:"bytes,4,req,name=message" json:"message,omitempty"`
   	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
   	XXX_unrecognized     []byte       `json:"-"`
   	XXX_sizecache        int32        `json:"-"`
   }
   ```

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       In fact, we only need to deal with the map of listeners responsible for managing the producer objects.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r717609360



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       I thought all commands sent to the broker will have a request id? Do we still need to clean those up from the the pending request queue?

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
-			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+			c.log.Warnf("Received unexpected error response for producer %d of type %s",
+				producerID, sendError.GetError())
 			return
 		}
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	default:
 		// By default, for transient error, let the reconnection logic
 		// to take place and re-establish the produce again
-		c.Close()

Review comment:
       Why don't we need to close the connection here anymore?

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       I see. The thing I'm still confused about is. Each request sent on the connection can get added into the pending request map right?
   
   https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L620
   ```
   func (c *connection) internalSendRequest(req *request) {
   	if c.closed() {
   		c.log.Warnf("internalSendRequest failed for connectionClosed")
   		if req.callback != nil {
   			req.callback(req.cmd, ErrConnectionClosed)
   		}
   	} else {
   		c.pendingLock.Lock()
   		if req.id != nil {
   			c.pendingReqs[*req.id] = req
   		}
   		c.pendingLock.Unlock()
   		c.writeCommand(req.cmd)
   	}
   }
   ```
   
   If a command is sent and gets added to the pending requests map and then we get this response from the broker `pb.ServerError_TopicTerminatedError` will we end up leaving/leaking commands in the pending requests map? If there is no request id maybe it can't be avoided. Am I missing something?
   
   
   

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))

Review comment:
       Does the producer still need to be notified somehow?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy merged pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy merged pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#issuecomment-939646811


   As @cckellogg  said, the current method may cause the leak of pendingRequest resources, here we first merge the current pull request, and then create a new issue to track the problem here. And the issue is: https://github.com/apache/pulsar-client-go/issues/636.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r717645039



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       In CommandSendError, just only `ProducerId` and `SequenceId`
   
   ```
   type CommandSendError struct {
   	ProducerId           *uint64      `protobuf:"varint,1,req,name=producer_id,json=producerId" json:"producer_id,omitempty"`
   	SequenceId           *uint64      `protobuf:"varint,2,req,name=sequence_id,json=sequenceId" json:"sequence_id,omitempty"`
   	Error                *ServerError `protobuf:"varint,3,req,name=error,enum=pulsar.proto.ServerError" json:"error,omitempty"`
   	Message              *string      `protobuf:"bytes,4,req,name=message" json:"message,omitempty"`
   	XXX_NoUnkeyedLiteral struct{}     `json:"-"`
   	XXX_unrecognized     []byte       `json:"-"`
   	XXX_sizecache        int32        `json:"-"`
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r719017299



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
-			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+			c.log.Warnf("Received unexpected error response for producer %d of type %s",
+				producerID, sendError.GetError())
 			return
 		}
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	default:
 		// By default, for transient error, let the reconnection logic
 		// to take place and re-establish the produce again
-		c.Close()

Review comment:
       I think the java client closes the connection in this case? What does it do for the other cases above like `pb.ServerError_TopicTerminatedError`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r719027325



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
-			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+			c.log.Warnf("Received unexpected error response for producer %d of type %s",
+				producerID, sendError.GetError())
 			return
 		}
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	default:
 		// By default, for transient error, let the reconnection logic
 		// to take place and re-establish the produce again
-		c.Close()

Review comment:
       OK, will fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r718151165



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       Yes, Agree with your ideas.
   
   The first point: Here we really should deal with SendError Command, not ErrorCommand. This should be determined. But the requestID is not included in SendError.
   
   The second point: This requestID should be obtained from the Protobuf protocol. Processing PendingRequest needs to rely on requestID, so now I am also a bit confused. After receiving SendError, what should we do here?

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       Yes, Agree with your ideas.
   
   The first point: Here we really should deal with SendError Command, not ErrorCommand. This should be determined. But the requestID is not included in SendError.
   
   The second point: This requestID should be obtained from the Protobuf protocol. Processing PendingRequest needs to rely on requestID, so now I am also a bit confused. After receiving SendError, what should we do here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#issuecomment-928866518


   ```
   --- FAIL: TestNamespaceTopicsNamespaceDoesNotExit (54.17s)
       client_impl_test.go:387: 
           	Error Trace:	client_impl_test.go:387
           	Error:      	Expected nil, but got: &errors.errorString{s:"server error: AuthorizationError: Exception occurred while trying to authorize GetTopicsOfNamespace"}
           	Test:       	TestNamespaceTopicsNamespaceDoesNotExit
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r718156097



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
-			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+			c.log.Warnf("Received unexpected error response for producer %d of type %s",
+				producerID, sendError.GetError())
 			return
 		}
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	default:
 		// By default, for transient error, let the reconnection logic
 		// to take place and re-establish the produce again
-		c.Close()

Review comment:
       Refer to the above ideas, if we need to clean the pendingRequest cache, then we'd better close the connection here. If we only need to clean up the map of the producer of listeners, then here we only trigger the logic of reconnection should be enough. Because of this connection, there may be other producers in use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#issuecomment-928866518


   ```
   --- FAIL: TestNamespaceTopicsNamespaceDoesNotExit (54.17s)
       client_impl_test.go:387: 
           	Error Trace:	client_impl_test.go:387
           	Error:      	Expected nil, but got: &errors.errorString{s:"server error: AuthorizationError: Exception occurred while trying to authorize GetTopicsOfNamespace"}
           	Test:       	TestNamespaceTopicsNamespaceDoesNotExit
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r718151165



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       Yes, Agree with your ideas.
   
   The first point: Here we really should deal with SendError Command, not ErrorCommand. This should be determined. But the requestID is not included in SendError.
   
   The second point: This requestID should be obtained from the Protobuf protocol. Processing PendingRequest needs to rely on requestID, so now I am also a bit confused. After receiving SendError, what should we do here?

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       Yes, Agree with your ideas.
   
   The first point: Here we really should deal with SendError Command, not ErrorCommand. This should be determined. But the requestID is not included in SendError.
   
   The second point: This requestID should be obtained from the Protobuf protocol. Processing PendingRequest needs to rely on requestID, so now I am also a bit confused. After receiving SendError, what should we do here?

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
-			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+			c.log.Warnf("Received unexpected error response for producer %d of type %s",
+				producerID, sendError.GetError())
 			return
 		}
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	default:
 		// By default, for transient error, let the reconnection logic
 		// to take place and re-establish the produce again
-		c.Close()

Review comment:
       Refer to the above ideas, if we need to clean the pendingRequest cache, then we'd better close the connection here. If we only need to clean up the map of the producer of listeners, then here we only trigger the logic of reconnection should be enough. Because of this connection, there may be other producers in use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r717646591



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       In fact, we only need to deal with the map of listeners responsible for managing the producer objects.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r717720592



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
-			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+			c.log.Warnf("Received unexpected error response for producer %d of type %s",
+				producerID, sendError.GetError())
 			return
 		}
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	default:
 		// By default, for transient error, let the reconnection logic
 		// to take place and re-establish the produce again
-		c.Close()

Review comment:
       Why don't we need to close the connection here anymore?

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       I see. The thing I'm still confused about is. Each request sent on the connection can get added into the pending request map right?
   
   https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/connection.go#L620
   ```
   func (c *connection) internalSendRequest(req *request) {
   	if c.closed() {
   		c.log.Warnf("internalSendRequest failed for connectionClosed")
   		if req.callback != nil {
   			req.callback(req.cmd, ErrConnectionClosed)
   		}
   	} else {
   		c.pendingLock.Lock()
   		if req.id != nil {
   			c.pendingReqs[*req.id] = req
   		}
   		c.pendingLock.Unlock()
   		c.writeCommand(req.cmd)
   	}
   }
   ```
   
   If a command is sent and gets added to the pending requests map and then we get this response from the broker `pb.ServerError_TopicTerminatedError` will we end up leaving/leaking commands in the pending requests map? If there is no request id maybe it can't be avoided. Am I missing something?
   
   
   

##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))

Review comment:
       Does the producer still need to be notified somehow?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cckellogg commented on a change in pull request #622: Fix logic of command for sendError

Posted by GitBox <gi...@apache.org>.
cckellogg commented on a change in pull request #622:
URL: https://github.com/apache/pulsar-client-go/pull/622#discussion_r717609360



##########
File path: pulsar/internal/connection.go
##########
@@ -752,38 +752,51 @@ func (c *connection) handleAuthChallenge(authChallenge *pb.CommandAuthChallenge)
 	c.writeCommand(baseCommand(pb.BaseCommand_AUTH_RESPONSE, cmdAuthResponse))
 }
 
-func (c *connection) handleSendError(cmdError *pb.CommandError) {
-	c.log.Warnf("Received send error from server: [%v] : [%s]", cmdError.GetError(), cmdError.GetMessage())
+func (c *connection) handleSendError(sendError *pb.CommandSendError) {
+	c.log.Warnf("Received send error from server: [%v] : [%s]", sendError.GetError(), sendError.GetMessage())
 
-	requestID := cmdError.GetRequestId()
+	producerID := sendError.GetProducerId()
 
-	switch cmdError.GetError() {
+	c.listenersLock.RLock()
+	producerListener := c.listeners[producerID]
+	c.listenersLock.RUnlock()
+
+	switch sendError.GetError() {
 	case pb.ServerError_NotAllowedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)
 		if !ok {
 			c.log.Warnf("Received unexpected error response for request %d of type %s",
-				requestID, cmdError.GetError())
+				producerID, sendError.GetError())
 			return
 		}
 
-		errMsg := fmt.Sprintf("server error: %s: %s", cmdError.GetError(), cmdError.GetMessage())
-		request.callback(nil, errors.New(errMsg))
+		c.log.Warnf("server error: %s: %s", sendError.GetError(), sendError.GetMessage())
 	case pb.ServerError_TopicTerminatedError:
-		request, ok := c.deletePendingRequest(requestID)
+		_, ok := c.deletePendingProducers(producerID)

Review comment:
       I thought all commands sent to the broker will have a request id? Do we still need to clean those up from the the pending request queue?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org