You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/11/20 11:55:38 UTC

[GitHub] [pulsar-client-go] wonko opened a new issue #398: ReceiverQueueSize of 0 doesn't work as intended

wonko opened a new issue #398:
URL: https://github.com/apache/pulsar-client-go/issues/398


   (This was already reported against the old driver here (by me): https://github.com/apache/pulsar/issues/6496 but it was noted this was fixed with the new go implementation, which doesn't seem the case. The case is identical.)
   
   #### Expected behavior
   
   When running pulsar as a queue system, with long/varying processing times per message, a client setting ReceiverQueueSize to 0 should allow a client to only request a new message when it's ready for new work.
   
   When specifying a ReceiverQueueSize of 0, no messages should be pre-fetched, and the client should only request a message when it's ready with the previous message. This is also how it's documented at https://pulsar.apache.org/docs/en/cookbooks-message-queue/#client-configuration-changes.
   
   I also tried a python implementation, and that actually works as it should be.
   
   #### Actual behavior
   
   Setting a ReceiverQueueSize of 0 sets the defaultQueueSize of 1000, which is not intended.
   
   #### Steps to reproduce
   
   Consumer and producer code below. Run 3 consumers: two at a 1 second sleeptime and one at a 10 second sleep. When injecting 10 messages, it should give you 1 message on the 10 second consumer, and 4 and 5 on both 1 second sleeptime consumers.
   
   How I did it: Run a standalone docker pulsar, and open 4 terminal windows. In 2, `go run consumer.go`, in 1 `go run consumer.go --sleeptime=10` and once they're all connected, run `go run producer.go` in the last window.
   
   Simplified output:
   
   Expected:
   - consumer_10_seconds: consumes message 0
   - consumer_1_second: consumes messages 1,3,5,7,9
   - consumer_1_second: consumes messages 2,4,6,8
   total runtime: 10 seconds
   
   Actual:
   - consumer_10_seconds: consumes message 0,5
   - consumer_1_second: consumes messages 1,3,7,9
   - consumer_1_second: consumes messages 2,4,6,8
   total runtime: 20 seconds
   
   #### System configuration
   **Pulsar version**: 2.6.2 (docker, standalone)
   Latest pulsar-client-go driver (v0.3.0)
   
   #### Things I already tried
   
   I fiddled a bit in the pulsar-client-go, hoping to resolve this. 
   
   Made sure that it accepts 0 by changing the check here: https://github.com/apache/pulsar-client-go/blob/master/pulsar/consumer_impl.go#L94-L96, and made sure that the initialPermits was at least 1. This didn't yield the expected result.
   
   #### Code: 
   
   Producer:
   
   ```
   package main
   
   import (
   	"context"
   	"flag"
   	"fmt"
   	"log"
   	"time"
   
   	"github.com/apache/pulsar-client-go/pulsar"
   )
   
   func main() {
   	var sleeptime int
   
   	flag.IntVar(&sleeptime, "sleeptime", 1, "sleeptime")
   	flag.Parse()
   
   	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer client.Close()
   
   	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
   		Topic:             "testtopic",
   		SubscriptionName:  "sharedtest",
   		Type:              pulsar.Shared,
   		ReceiverQueueSize: 0,
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer consumer.Close()
   
   	for {
   		msg, err := consumer.Receive(context.Background())
   		if err != nil {
   			log.Fatal(err)
   		}
   
   		fmt.Printf("Received message  msgId: %s -- content: '%s' - will now sleep for %d seconds\n", msg.ID(), string(msg.Payload()), sleeptime)
   
   		time.Sleep(time.Duration(sleeptime) * time.Second)
   		consumer.Ack(msg)
   	}
   }
   
   ```
   
   Producer:
   
   ```
   package main
   
   import (
   	"context"
   	"fmt"
   
   	log "github.com/apache/pulsar/pulsar-client-go/logutil"
   	"github.com/apache/pulsar/pulsar-client-go/pulsar"
   )
   
   func main() {
   	fmt.Printf("Started...")
   	client, err := pulsar.NewClient(pulsar.ClientOptions{
   		URL: "pulsar://localhost:6650",
   	})
   
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	fmt.Printf("got Client")
   
   	defer client.Close()
   
   	producer, err := client.CreateProducer(pulsar.ProducerOptions{
   		Topic: "testtopic",
   	})
   	if err != nil {
   		log.Fatal(err)
   	}
   
   	defer producer.Close()
   
   	ctx := context.Background()
   
   	fmt.Printf("Ready for sending")
   	for i := 0; i < 10; i++ {
   		msgID, err := producer.SendAndGetMsgID(ctx, pulsar.ProducerMessage{
   			Payload: []byte(fmt.Sprintf("hello-%d", i)),
   		})
   
   		if err != nil {
   			log.Fatal(err)
   		}
   		fmt.Printf("The message Id value is: [%v] \n", msgID)
   	}
   }
   
   ```
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] liangyuanpeng commented on issue #398: ReceiverQueueSize of 0 doesn't work as intended

Posted by GitBox <gi...@apache.org>.
liangyuanpeng commented on issue #398:
URL: https://github.com/apache/pulsar-client-go/issues/398#issuecomment-902825377


   I think i will work for it when i have time,receive any talk about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] liangyuanpeng edited a comment on issue #398: ReceiverQueueSize of 0 doesn't work as intended

Posted by GitBox <gi...@apache.org>.
liangyuanpeng edited a comment on issue #398:
URL: https://github.com/apache/pulsar-client-go/issues/398#issuecomment-902825377


   I think i will try to work for it when i have time,receive any talk about it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wonko commented on issue #398: ReceiverQueueSize of 0 doesn't work as intended

Posted by GitBox <gi...@apache.org>.
wonko commented on issue #398:
URL: https://github.com/apache/pulsar-client-go/issues/398#issuecomment-1025583648


   Hi, I'd like to bring this ticket back to the attention. This keeps hurting simple queue-based processing of messages when there is a longer processing time per message combined with high parallelism. It's also a different behaviour compared to the Java and C-based drivers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wonko commented on issue #398: ReceiverQueueSize of 0 doesn't work as intended

Posted by GitBox <gi...@apache.org>.
wonko commented on issue #398:
URL: https://github.com/apache/pulsar-client-go/issues/398#issuecomment-860635070


   Hi, any update or timeline on this? We have to work with our own patched version of the driver for now, which is far from ideal ... This still seems like a basic functionality which isn't working as it should.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] wonko commented on issue #398: ReceiverQueueSize of 0 doesn't work as intended

Posted by GitBox <gi...@apache.org>.
wonko commented on issue #398:
URL: https://github.com/apache/pulsar-client-go/issues/398#issuecomment-731319556


   I managed to get it to work in my case, but I don't think my PR is complete ... anyhow, submitting as reference.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org