You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/06/16 03:59:07 UTC
[GitHub] [pulsar-client-go] cgfork opened a new issue #282: The client still receives the message after close consumer or close client in Shared subscription
cgfork opened a new issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282
#### Expected behavior
Stop consuming the messages after invoking the consumer.Close()
#### Actual behavior
Still receiving the messages
#### Steps to reproduce
```go
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://<Your Pulsar Domain>:6650",
})
checkError(err)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "test",
Name: "consumer-test-client",
SubscriptionName: "consumer-test",
Type: pulsar.Shared,
})
checkError(err)
if os.Args[1] == "block" {
time.Sleep(1 * time.Hour)
}
// will return an error
err = consumer.Unsubscribe()
fmt.Println("Unsubscribe", err)
consumer.Close()
fmt.Println("Consumer Closed")
client.Close()
fmt.Println("Client Closed")
time.Sleep(1 * time.Hour)
}
```
then, build `go build -o pulsar` and run the command in two terminal:
terminal 1:
```shell
$ ./pulsar block
INFO[0000] Connecting to broker remote_addr="pulsar://172.23.34.15:6650"
INFO[0000] TCP connection established local_addr="10.23.67.12:53964" remote_addr="pulsar://172.23.34.15:6650"
INFO[0000] Connection is ready local_addr="10.23.67.12:53964" remote_addr="pulsar://172.23.34.15:6650"
INFO[0000] Connected consumer name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
INFO[0000] Created consumer name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
```
terminal 2:
```shell
$ ./pulsar unblock 130 ↵
INFO[0000] Connecting to broker remote_addr="pulsar://172.23.34.15:6650"
INFO[0000] TCP connection established local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
INFO[0000] Connection is ready local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
INFO[0000] Connected consumer name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
INFO[0000] Created consumer name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
ERRO[0000] Failed to unsubscribe consumer error="server error: MetadataError: Unconnected or shared consumer attempting to unsubscribe" name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
INFO[0000] The consumer[1] successfully unsubscribed name=consumer-test-client subscription=consumer-test topic="persistent://public/default/test"
Unsubscribe topic persistent://public/default/test, subscription consumer-test: server error: MetadataError: Unconnected or shared consumer attempting to unsubscribe
Consumer Closed
Client Closed
WARN[0046] Got unexpected message: ledgerId:796 entryId:20 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:22 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:24 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:26 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:28 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:19 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:30 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
WARN[0046] Got unexpected message: ledgerId:796 entryId:32 partition:-1 consumerID=1 local_addr="10.23.67.12:53850" remote_addr="pulsar://172.23.34.15:6650"
```
You can also run the follow command with pulsar-admin:
```shell
# bin/pulsar-admin topics stats persistent://public/default/test
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 10.721512347506462,
"msgThroughputOut" : 739.7843519779458,
"averageMsgSize" : 0.0,
"storageSize" : 3036,
"backlogSize" : 1725,
"publishers" : [ ],
"subscriptions" : {
"consumer-test" : {
"msgRateOut" : 10.721512347506462,
"msgThroughputOut" : 739.7843519779458,
"msgRateRedeliver" : 0.0,
"msgBacklog" : 25,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 25,
"type" : "Shared",
"msgRateExpired" : 0.0,
"lastExpireTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 10.721512347506462,
"msgThroughputOut" : 739.7843519779458,
"msgRateRedeliver" : 0.0,
"consumerName" : "consumer-test-client",
"availablePermits" : 975,
"unackedMessages" : 25,
"blockedConsumerOnUnackedMsgs" : false,
"metadata" : { },
"connectedSince" : "2020-06-16T11:53:30.052+08:00",
"address" : "/10.23.67.12:53964"
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"msgRateRedeliver" : 0.0,
"consumerName" : "consumer-test-client",
"availablePermits" : 1000,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"metadata" : { },
"connectedSince" : "2020-06-16T11:53:33.851+08:00",
"address" : "/10.23.67.12:53969"
} ],
"isReplicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"bytesInCounter" : 42367,
"msgInCounter" : 612
}
```
#### System configuration
**Pulsar version**: 2.5.0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] merlimat closed issue #282: The client still receives the message after close consumer or close client in Shared subscription
Posted by GitBox <gi...@apache.org>.
merlimat closed issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] wolfstudy commented on issue #282: The client still receives the message after close consumer or close client in Shared subscription
Posted by GitBox <gi...@apache.org>.
wolfstudy commented on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644529674
Thanks @cgfork feedback, this looks like a bug, will fix it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] cgfork edited a comment on issue #282: The client still receives the message after close consumer or close client in Shared subscription
Posted by GitBox <gi...@apache.org>.
cgfork edited a comment on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644521918
Probably shouldn't set the connection state to closed after unsubscribe:
```go
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
if pc.state == consumerClosed || pc.state == consumerClosing {
pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
return
}
pc.state = consumerClosing
requestID := pc.client.rpcClient.NewRequestID()
cmdUnsubscribe := &pb.CommandUnsubscribe{
RequestId: proto.Uint64(requestID),
ConsumerId: proto.Uint64(pc.consumerID),
}
_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
unsub.err = err
}
pc.conn.DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
pc.state = consumerClosed
}
...
func (pc *partitionConsumer) Close() {
if pc.state != consumerReady {
return
}
req := &closeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
// wait for request to finish
<-req.doneCh
}
```
The closeRequest was'nt sent after unsubscribe
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] cgfork commented on issue #282: The client still receives the message after close consumer or close client in Shared subscription
Posted by GitBox <gi...@apache.org>.
cgfork commented on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-644521918
Probably shouldn't set the connection state to closed after unsubscribe:
```go
func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
defer close(unsub.doneCh)
if pc.state == consumerClosed || pc.state == consumerClosing {
pc.log.Error("Failed to unsubscribe consumer, the consumer is closing or consumer has been closed")
return
}
pc.state = consumerClosing
requestID := pc.client.rpcClient.NewRequestID()
cmdUnsubscribe := &pb.CommandUnsubscribe{
RequestId: proto.Uint64(requestID),
ConsumerId: proto.Uint64(pc.consumerID),
}
_, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
if err != nil {
pc.log.WithError(err).Error("Failed to unsubscribe consumer")
unsub.err = err
}
pc.conn.DeleteConsumeHandler(pc.consumerID)
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
pc.log.Infof("The consumer[%d] successfully unsubscribed", pc.consumerID)
pc.state = consumerClosed
}
...
func (pc *partitionConsumer) Close() {
if pc.state != consumerReady {
return
}
req := &closeRequest{doneCh: make(chan struct{})}
pc.eventsCh <- req
// wait for request to finish
<-req.doneCh
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar-client-go] jiazhai commented on issue #282: The client still receives the message after close consumer or close client in Shared subscription
Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #282:
URL: https://github.com/apache/pulsar-client-go/issues/282#issuecomment-645095354
@cgfork Seems you already find the root cause, Would you like to provide a PR to fix it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org