You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/05 09:28:11 UTC

[GitHub] blutack opened a new issue #2521: Go client reader hang when processing backlog of ~6M messages

blutack opened a new issue #2521: Go client reader hang when processing backlog of ~6M messages
URL: https://github.com/apache/incubator-pulsar/issues/2521
 
 
   #### Expected behavior
   
   The reader should read all topic messages and then continue to follow the topic
   
   #### Actual behavior
   
   The reader reads approximately 230K messages and then the following call to Next blocks forever.
   
   #### Steps to reproduce
   
   I have a topic configured with infinite retention which I've loaded approx 6M ~60 byte messages into. This topic is fed with a new test message every second by a producer.
   
   I've use the following python code which gives me the result I would expect.
   ```python
   import pulsar, time
   
   client = pulsar.Client('pulsar://localhost:6650')
   reader = client.create_reader('positions', pulsar.MessageId.earliest)
   
   i = 0
   
   while True:
       reader.read_next()
       i+=1
       print(f"{time.time()}: {i}")
   ```
   Which outputs this  - all the backlog is read and then new messages as they come in:
   ```
   <snip>
   1536138110.039305: 6332371
   1536138110.0393229: 6332372
   1536138110.0393403: 6332373
   1536138110.0393577: 6332374
   1536138110.039375: 6332375
   1536138110.0393927: 6332376
   1536138110.114043: 6332377
   1536138111.1111479: 6332378
   1536138112.1161785: 6332379
   1536138113.1167316: 6332380
   1536138114.1186855: 6332381
   1536138115.1202247: 6332382
   ```
   
   The problematic Go code is:
   ```golang
   package main
   
   import (
   	"log"
   
   	"github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   	log.SetFlags(log.LstdFlags | log.Lmicroseconds)
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://localhost:6650",
   		OperationTimeoutSeconds: 120,
   	})
   
   	if err != nil {
   		log.Fatalf("Could not instantiate Pulsar client: %v", err)
   	}
   
   	msgChannel := make(chan pulsar.ReaderMessage)
   
   	reader, err := client.CreateReader(pulsar.ReaderOptions{
   		Topic:          "positions",
   		StartMessageID: pulsar.EarliestMessage,
   		Name:           "go_reader",
   		MessageChannel: msgChannel,
   	})
   
   	if err != nil {
   		log.Fatalf("Could not instantiate Pulsar client: %v", err)
   	}
   
   	defer reader.Close()
   
   	i := 0
   	for cm := range msgChannel {
   		i++
   		_ = cm.Message
   
   		log.Printf("%d\n", i)
   	}
   }
   ```
   
   And outputs the following: note that most of the messages are not received and the process hangs.
   ```
   <snip>
   2018/09/05 10:08:04.674907 153036
   2018/09/05 10:08:04.674931 153037
   2018/09/05 10:08:04.674936 153038
   2018/09/05 10:08:04.675011 153039
   2018/09/05 10:08:04.675016 153040
   2018/09/05 10:08:04.675044 153041
   2018/09/05 10:08:04.675048 153042
   2018/09/05 10:08:04.675070 153043
   2018/09/05 10:08:04.675079 153044
   2018/09/05 10:08:04.675102 153045
   2018/09/05 10:08:04.675110 153046
   2018/09/05 10:08:04.675137 153047
   2018/09/05 10:08:04.675141 153048
   <hangs here>
   ```
   The number of messages received before hanging seems to depend on wall time - if I remove the log calls I get about 250K.
   
   When running the Go code I eventually get the following in my pulsar log:
   ```
   10:09:01.340 [pulsar-io-48-11] WARN  org.apache.pulsar.common.api.PulsarHandler - [[id: 0xcb26b3aa, L:/127.0.0.1:6650 - R:/127.0.0.1:35682]] Forcing connection to close after keep-alive timeout
   10:09:01.341 [pulsar-io-48-11] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:35682
   10:09:01.342 [pulsar-io-48-12] WARN  org.apache.pulsar.common.api.PulsarHandler - [[id: 0x894e4202, L:/127.0.0.1:6650 - R:/127.0.0.1:35684]] Forcing connection to close after keep-alive timeout
   10:09:01.343 [pulsar-io-48-12] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /127.0.0.1:35684
   10:09:01.343 [pulsar-io-48-12] INFO  org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer - Removing consumer Consumer{subscription=PersistentSubscription{topic=persistent://public/default/positions, name=reader-4a7656}, consumerId=0, consumerName=go_reader, address=/127.0.0.1:35684}
   ```
   
   Hope all this is helpful, I'll keep looking into it on my end but I'm unfamiliar with Go wrapping cpp so will be slow.
   
   #### System configuration
   **Pulsar version**: 2.1.0-incubating w/ standard standalone config and operating mode
   **OS**: Ubuntu 18.04 w/ Oracle Java 8
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services