You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/16 04:21:37 UTC

[GitHub] [pulsar-client-go] cgfork edited a comment on issue #282: The client still receives the message after close consumer or close client in Shared subscription

cgfork edited a comment on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644521918


   Probably shouldn't set the connection state to closed after unsubscribe:
   ```go
   func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
   	defer close(unsub.doneCh)
   
   	if pc.state == consumerClosed || pc.state == consumerClosing {
   		pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
   		return
   	}
   
   	pc.state = consumerClosing
   	requestID := pc.client.rpcClient.NewRequestID()
   	cmdUnsubscribe := &pb.CommandUnsubscribe{
   		RequestId:  proto.Uint64(requestID),
   		ConsumerId: proto.Uint64(pc.consumerID),
   	}
   	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
   	if err != nil {
   		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
   		unsub.err = err
   	}
   
   	pc.conn.DeleteConsumeHandler(pc.consumerID)
   	if pc.nackTracker != nil {
   		pc.nackTracker.Close()
   	}
   	pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
   	pc.state = consumerClosed
   }
   ...
   func (pc *partitionConsumer) Close() {
   	if pc.state != consumerReady {
   		return
   	}
   
   	req := &closeRequest{doneCh: make(chan struct{})}
   	pc.eventsCh <- req
   
   	// wait for request to finish
   	<-req.doneCh
   }
   ```
   
   The closeRequest was'nt sent after unsubscribe


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org