You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/16 03:59:07 UTC

[GitHub] [pulsar-client-go] cgfork opened a new issue #282: The client still receives the message after close consumer or close client in Shared subscription

cgfork opened a new issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282


   #### Expected behavior
   
   Stop consuming the messages after invoking the consumer.Close()
   
   #### Actual behavior
   
   Still receiving the messages
   
   #### Steps to reproduce
   
   ```go
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://<Your Pulsar Domain>:6650",
   	})
   	checkError(err)
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:            "test",
   		Name:             "consumer-test-client",
   		SubscriptionName: "consumer-test",
   		Type:             pulsar.Shared,
   	})
   	checkError(err)
   	if os.Args[1] == "block" {
   		time.Sleep(1 * time.Hour)
   	}
   	// will return an error
   	err = consumer.Unsubscribe()
   	fmt.Println("Unsubscribe", err)
   	consumer.Close()
   	fmt.Println("Consumer Closed")
   	client.Close()
   	fmt.Println("Client Closed")
   	time.Sleep(1 * time.Hour)
   }
   ```
   then, build `go build -o pulsar` and run the command in two terminal:
   terminal 1:
   ```shell
   $ ./pulsar block
   INFO[0000] Connecting to broker                          remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] TCP connection established                    local_addr="10.23.67.12:53964" remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connection is ready                           local_addr="10.23.67.12:53964" remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connected consumer                            name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
   INFO[0000] Created consumer                              name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
   ```
   terminal 2:
   ```shell
   $ ./pulsar unblock                                                                                                                                              130 ↵
   INFO[0000] Connecting to broker                          remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] TCP connection established                    local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connection is ready                           local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   INFO[0000] Connected consumer                            name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
   INFO[0000] Created consumer                              name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
   ERRO[0000] Failed to unsubscribe consumer                error="server error: MetadataError: Unconnected or shared consumer attempting to unsubscribe" name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
   INFO[0000] The consumer[1] successfully unsubscribed     name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
   Unsubscribe topic persistent://public/default/test, subscription consumer-test: server error: MetadataError: Unconnected or shared consumer attempting to unsubscribe
   Consumer Closed
   Client Closed
   WARN[0046] Got unexpected message: ledgerId:796 entryId:20 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:22 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:24 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:26 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:28 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:19 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:30 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   WARN[0046] Got unexpected message: ledgerId:796 entryId:32 partition:-1   consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
   ```
   
   You can also run the follow command with pulsar-admin:
   ```shell
   # bin/pulsar-admin topics stats persistent://public/default/test
   {
     "msgRateIn" : 0.0,
     "msgThroughputIn" : 0.0,
     "msgRateOut" : 10.721512347506462,
     "msgThroughputOut" : 739.7843519779458,
     "averageMsgSize" : 0.0,
     "storageSize" : 3036,
     "backlogSize" : 1725,
     "publishers" : [ ],
     "subscriptions" : {
       "consumer-test" : {
         "msgRateOut" : 10.721512347506462,
         "msgThroughputOut" : 739.7843519779458,
         "msgRateRedeliver" : 0.0,
         "msgBacklog" : 25,
         "blockedSubscriptionOnUnackedMsgs" : false,
         "msgDelayed" : 0,
         "unackedMessages" : 25,
         "type" : "Shared",
         "msgRateExpired" : 0.0,
         "lastExpireTimestamp" : 0,
         "consumers" : [ {
           "msgRateOut" : 10.721512347506462,
           "msgThroughputOut" : 739.7843519779458,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "consumer-test-client",
           "availablePermits" : 975,
           "unackedMessages" : 25,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2020-06-16T11:53:30.052+08:00",
           "address" : "/10.23.67.12:53964"
         }, {
           "msgRateOut" : 0.0,
           "msgThroughputOut" : 0.0,
           "msgRateRedeliver" : 0.0,
           "consumerName" : "consumer-test-client",
           "availablePermits" : 1000,
           "unackedMessages" : 0,
           "blockedConsumerOnUnackedMsgs" : false,
           "metadata" : { },
           "connectedSince" : "2020-06-16T11:53:33.851+08:00",
           "address" : "/10.23.67.12:53969"
         } ],
         "isReplicated" : false
       }
     },
     "replication" : { },
     "deduplicationStatus" : "Disabled",
     "bytesInCounter" : 42367,
     "msgInCounter" : 612
   }
   ```
   
   #### System configuration
   **Pulsar version**: 2.5.0
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] merlimat closed issue #282: The client still receives the message after close consumer or close client in Shared subscription

Posted by GitBox <gi...@apache.org>.
merlimat closed issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wolfstudy commented on issue #282: The client still receives the message after close consumer or close client in Shared subscription

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644529674


   Thanks @cgfork feedback, this looks like a bug, will fix it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cgfork edited a comment on issue #282: The client still receives the message after close consumer or close client in Shared subscription

Posted by GitBox <gi...@apache.org>.
cgfork edited a comment on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644521918


   Probably shouldn't set the connection state to closed after unsubscribe:
   ```go
   func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
   	defer close(unsub.doneCh)
   
   	if pc.state == consumerClosed || pc.state == consumerClosing {
   		pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
   		return
   	}
   
   	pc.state = consumerClosing
   	requestID := pc.client.rpcClient.NewRequestID()
   	cmdUnsubscribe := &pb.CommandUnsubscribe{
   		RequestId:  proto.Uint64(requestID),
   		ConsumerId: proto.Uint64(pc.consumerID),
   	}
   	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
   	if err != nil {
   		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
   		unsub.err = err
   	}
   
   	pc.conn.DeleteConsumeHandler(pc.consumerID)
   	if pc.nackTracker != nil {
   		pc.nackTracker.Close()
   	}
   	pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
   	pc.state = consumerClosed
   }
   ...
   func (pc *partitionConsumer) Close() {
   	if pc.state != consumerReady {
   		return
   	}
   
   	req := &closeRequest{doneCh: make(chan struct{})}
   	pc.eventsCh <- req
   
   	// wait for request to finish
   	<-req.doneCh
   }
   ```
   
   The closeRequest was'nt sent after unsubscribe


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] cgfork commented on issue #282: The client still receives the message after close consumer or close client in Shared subscription

Posted by GitBox <gi...@apache.org>.
cgfork commented on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644521918


   Probably shouldn't set the connection state to closed after unsubscribe:
   ```go
   func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
   	defer close(unsub.doneCh)
   
   	if pc.state == consumerClosed || pc.state == consumerClosing {
   		pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
   		return
   	}
   
   	pc.state = consumerClosing
   	requestID := pc.client.rpcClient.NewRequestID()
   	cmdUnsubscribe := &pb.CommandUnsubscribe{
   		RequestId:  proto.Uint64(requestID),
   		ConsumerId: proto.Uint64(pc.consumerID),
   	}
   	_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
   	if err != nil {
   		pc.log.WithError(err).Error("Failed to unsubscribe consumer")
   		unsub.err = err
   	}
   
   	pc.conn.DeleteConsumeHandler(pc.consumerID)
   	if pc.nackTracker != nil {
   		pc.nackTracker.Close()
   	}
   	pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
   	pc.state = consumerClosed
   }
   ...
   func (pc *partitionConsumer) Close() {
   	if pc.state != consumerReady {
   		return
   	}
   
   	req := &closeRequest{doneCh: make(chan struct{})}
   	pc.eventsCh <- req
   
   	// wait for request to finish
   	<-req.doneCh
   }
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] jiazhai commented on issue #282: The client still receives the message after close consumer or close client in Shared subscription

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-645095354


   @cgfork Seems you already find the root cause, Would you like to provide a PR to fix it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org