You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/05 09:28:11 UTC
[GitHub] blutack opened a new issue #2521: Go client reader hang when
processing backlog of ~6M messages
blutack opened a new issue #2521: Go client reader hang when processing backlog of ~6M messages
URL: https://github.com/apache/incubator-pulsar/issues/2521
#### Expected behavior
The reader should read all topic messages and then continue to follow the topic
#### Actual behavior
The reader reads approximately 230K messages and then the following call to Next blocks forever.
#### Steps to reproduce
I have a topic configured with infinite retention which I've loaded approx 6M ~60 byte messages into. This topic is fed with a new test message every second by a producer.
I've use the following python code which gives me the result I would expect.
```python
import pulsar, time
client = pulsar.Client('pulsar://localhost:6650')
reader = client.create_reader('positions', pulsar.MessageId.earliest)
i = 0
while True:
reader.read_next()
i+=1
print(f"{time.time()}: {i}")
```
Which outputs this - all the backlog is read and then new messages as they come in:
```
<snip>
1536138110.039305: 6332371
1536138110.0393229: 6332372
1536138110.0393403: 6332373
1536138110.0393577: 6332374
1536138110.039375: 6332375
1536138110.0393927: 6332376
1536138110.114043: 6332377
1536138111.1111479: 6332378
1536138112.1161785: 6332379
1536138113.1167316: 6332380
1536138114.1186855: 6332381
1536138115.1202247: 6332382
```
The problematic Go code is:
```golang
package main
import (
"log"
"github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeoutSeconds: 120,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
msgChannel := make(chan pulsar.ReaderMessage)
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "positions",
StartMessageID: pulsar.EarliestMessage,
Name: "go_reader",
MessageChannel: msgChannel,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer reader.Close()
i := 0
for cm := range msgChannel {
i++
_ = cm.Message
log.Printf("%d\n", i)
}
}
```
And outputs the following: note that most of the messages are not received and the process hangs.
```
<snip>
2018/09/05 10:08:04.674907 153036
2018/09/05 10:08:04.674931 153037
2018/09/05 10:08:04.674936 153038
2018/09/05 10:08:04.675011 153039
2018/09/05 10:08:04.675016 153040
2018/09/05 10:08:04.675044 153041
2018/09/05 10:08:04.675048 153042
2018/09/05 10:08:04.675070 153043
2018/09/05 10:08:04.675079 153044
2018/09/05 10:08:04.675102 153045
2018/09/05 10:08:04.675110 153046
2018/09/05 10:08:04.675137 153047
2018/09/05 10:08:04.675141 153048
<hangs here>
```
The number of messages received before hanging seems to depend on wall time - if I remove the log calls I get about 250K.
When running the Go code I eventually get the following in my pulsar log:
```
10:09:01.340 [pulsar-io-48-11] WARN org.apache.pulsar.common.api.PulsarHandler - [[id: 0xcb26b3aa, L:/127.0.0.1:6650 - R:/127.0.0.1:35682]] Forcing connection to close after keep-alive timeout
10:09:01.341 [pulsar-io-48-11] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:35682
10:09:01.342 [pulsar-io-48-12] WARN org.apache.pulsar.common.api.PulsarHandler - [[id: 0x894e4202, L:/127.0.0.1:6650 - R:/127.0.0.1:35684]] Forcing connection to close after keep-alive timeout
10:09:01.343 [pulsar-io-48-12] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:35684
10:09:01.343 [pulsar-io-48-12] INFO org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/positions, name=reader-4a7656}, consumerId=0, consumerName=go_reader, address=/127.0.0.1:35684}
```
Hope all this is helpful, I'll keep looking into it on my end but I'm unfamiliar with Go wrapping cpp so will be slow.
#### System configuration
**Pulsar version**: 2.1.0-incubating w/ standard standalone config and operating mode
**OS**: Ubuntu 18.04 w/ Oracle Java 8
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services