You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/03/06 09:03:31 UTC

[GitHub] [pulsar] wonko opened a new issue #6496: ReceiverQueueSize: -1 prefetches one message

wonko opened a new issue #6496: ReceiverQueueSize: -1 prefetches one message
URL: https://github.com/apache/pulsar/issues/6496
 
 
   **Describe the bug**
   When subscribing to a shared topic, and pulling messages one by one with a settings of `ReceiverQueueSize: -1` (golang client), it seems like it still prefetches one message.
   
   **To Reproduce**
   I created a simple go client based on the example, and introduced a sleeptime, which simulates the working time (code below).
   
   Run the setup as follows:
   - Make sure pulsar is running (i run the docker example)
   - Open 4 shells
   - start 2 consumers with a sleeptime of 1 second (`./client -sleeptime=1`) (lower two on the screenshot),
   - start 1 consumer with a sleeptime of 10 seconds (`./client -sleeptime=10`) (top left), 
   - inject 10 messages in the queue (`./producer`) (top right)
   
   I would expect to see one message consumed in the 10 seconds worker, and 4/5 in both 1-seconds consumers. However, I see 4 messages in each 1-second consumer, and 2 in the 10 seconds consumer.
   
   This is wrong as a queue mechanism - the sleeptime simulates long and short-lived async jobs. Each job should be assigned to the next "idle" consumer.
   
   It feels like one message is already either assigned to the 10-seconds worker, or 1 message is prefetched by that consumer.
   
   **Expected behavior**
   I expect that there is no prefetching / pre-queueing happening, so each message goes to the next available consumer.
   
   **Screenshots**
   ![Screenshot 2020-03-06 at 09 56 48](https://user-images.githubusercontent.com/3459/76068342-47ef8e80-5f91-11ea-9676-ea080e35946e.png)
   
   **Desktop (please complete the following information):**
   - MacOS X, docker pulsar 2.5.0, all latest pulled
   
   **Additional context**
   Consumer code:
   ```
   package main
   
   import (
   	"context"
   	"flag"
   	"fmt"
   	log "github.com/apache/pulsar/pulsar-client-go/logutil"
   	"github.com/apache/pulsar/pulsar-client-go/pulsar"
   	"time"
   )
   
   func main() {
   	var sleeptime int
   
   	flag.IntVar(&sleeptime, "sleeptime", 1, "sleeptime")
   	flag.Parse()
   
   	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer client.Close()
   
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:             "testtopic",
   		SubscriptionName:  "sharedtest",
   		Type:              pulsar.Shared,
   		ReceiverQueueSize: -1,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer consumer.Close()
   
   	for {
   		msg, err := consumer.Receive(context.Background())
   		if err != nil {
   			log.Fatal(err)
   		}
   
   		fmt.Printf("Received message  msgId: %s -- content: '%s' - will now sleep for %d seconds\n", msg.ID(), string(msg.Payload()), sleeptime)
   
   		time.Sleep(time.Duration(sleeptime) * time.Second)
   		consumer.Ack(msg)
   	}
   }
   ```
   
   Producer code:
   ```
   package main
   
   import (
   	"context"
   	"fmt"
   
   	log "github.com/apache/pulsar/pulsar-client-go/logutil"
   	"github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL:       "pulsar://localhost:6650",
   		IOThreads: 5,
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer client.Close()
   
   	producer, err := client.CreateProducer(pulsar.ProducerOptions{
   		Topic: "testtopic",
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer producer.Close()
   
   	ctx := context.Background()
   
   	for i := 0; i < 10; i++ {
   		msgID, err := producer.SendAndGetMsgID(ctx, pulsar.ProducerMessage{
   			Payload: []byte(fmt.Sprintf("hello-%d", i)),
   		})
   
   		if err != nil {
   			log.Fatal(err)
   		}
   		fmt.Printf("The message Id value is: [%v] \n", msgID)
   	}
   }
   ```
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] codelipenghui commented on issue #6496: ReceiverQueueSize: -1 prefetches one message

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6496: ReceiverQueueSize: -1 prefetches one message
URL: https://github.com/apache/pulsar/issues/6496#issuecomment-601986207
 
 
   @wolfstudy Can you please help take a look at this issue?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [pulsar] wonko commented on issue #6496: ReceiverQueueSize: -1 prefetches one message

Posted by GitBox <gi...@apache.org>.
wonko commented on issue #6496: ReceiverQueueSize: -1 prefetches one message
URL: https://github.com/apache/pulsar/issues/6496#issuecomment-601670449
 
 
   anything I can do to get feedback on this (is this intended, is this a bug, ...)? Is more info needed?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services