You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/02/16 21:57:41 UTC

[GitHub] [pulsar-client-go] frankjkelly opened a new issue #468: Add Support for NonDurable subscriptions

frankjkelly opened a new issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468


   **Is your feature request related to a problem? Please describe.**
   The Java API supports Non-Durable Subscriptions via SubscriptionMode
   https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java#L243-L256
   
   but it appears that the Go client does not
   https://github.com/apache/pulsar-client-go/blob/25f3075176f853f5f09d89b3b625e0646432735f/pulsar/consumer.go#L76
   
   This is useful in our use case where we have lots of topics with short-lived subscriptions that last minutes to hours and then once data is read the topic is no longer needed.
   To ensure that data is compacted in the bookies currently we have to set the subscription expiration time at the namespace level
   
   **Describe the solution you'd like**
   Please add support for Subscription Mode
   
   **Describe alternatives you've considered**
   Continue to use subscription expiration time
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-1011537633


   `HasNext()` gives you the ability to bail when you're caught up. If you just loop on `reader.Next(ctx)`, you should get the behavior you desire: the client should block until it gets new messages and then process them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] frankjkelly commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
frankjkelly commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-781511179


   Thanks - I guess that's not what I think we're seeing using the 0.3.0 Go Client library with Pulsar 2.6.1
   
   In the image below taken from our Pulsar Grafana dashboard hopefully you can see our Consumers are short-lived but the count of subscriptions lasts much longer and the count of subscribers only goes down at the 1 hour mark since we have set a subscription expiration time on that namespace.
   
   ![image](https://user-images.githubusercontent.com/62910985/108396153-8901a280-71e4-11eb-86e1-cf09c03d14cf.png)
   
   Here is how we have configured our consumer
   ```
   	consumer, err := p.client.Subscribe(pulsar.ConsumerOptions{
   		Topic:                       topic,
   		SubscriptionName:            subscriptionName,
   		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
   		Type:                        pulsar.Exclusive,
   	})
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman edited a comment on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
flowchartsman edited a comment on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-1011537633


   `HasNext()` gives you the ability to bail when you're caught up. If you just loop on `reader.Next(ctx)`, you should get the behavior you desire: the client should block until it gets new messages and then process them. You can then choose to bail on whatever property you want. That said, reading until `HasNext()` is false, will also get you to the "end", though it sounds like what you're doing isn't looking for the last message, but a "last" message of your own designation, in which case your ending condition is up to you: you can inspect the messages, bail after a certain amount of time waiting (with a ctx timeout) or whatever you like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] frankjkelly edited a comment on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
frankjkelly edited a comment on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-781511179


   Thanks - I guess that's not what I think we're seeing using the 0.3.0 Go Client library with Pulsar 2.6.1
   
   In the image below taken from our Pulsar Grafana dashboard hopefully you can see our Consumers (on the `cogito-dialog\wav` namespace are short-lived) but the count of subscriptions on that namespace lasts much longer and the count of subscribers only goes down at the 1 hour mark since we have set a subscription expiration time on that namespace (largely so we can allow the ledger to rollover and disk to be reclaimed).
   
   ![image](https://user-images.githubusercontent.com/62910985/108396153-8901a280-71e4-11eb-86e1-cf09c03d14cf.png)
   
   Here is how we have configured our consumer
   ```
   	consumer, err := p.client.Subscribe(pulsar.ConsumerOptions{
   		Topic:                       topic,
   		SubscriptionName:            subscriptionName,
   		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
   		Type:                        pulsar.Exclusive,
   	})
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-973057844


   Feature parity alone is not a compelling reason for me, since it duplicates functionality. Re-reading your use-case, it seems that you might even benefit more from a traditional consumer with aggressive subscription/topic removal policies. This way, at least, you know that if your consumer has a hiccough and needs to restart it won't have to start over, but that the data will still be deleted in a timely manner once the subscription is caught up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] apapia commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
apapia commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-1011514612


   @flowchartsman I'm picking this up from @frankjkelly and trying to implement the Reader interface.  However, I'm unsure how to implement it for our use case.  Essentially, we just have a worker than wants to read a topic from the beginning to the "end" where the end is indicated by a message with an end-of-stream property.  If this worker dies, the "job" will just be picked up by another worker which will start reading from the beginning of the stream.  Now we need to handle reading the stream while a publisher writing to it in real-time and we may also connect to the topic to read before the publisher does.
   
   What I'm unclear on is how to use the `reader.HasNext()` and `reader.Next(ctx)` interface.  Under what conditions will `HasNext()` return false?  Presumably any time are no unread messages?  If `HasNext()` returns false, what should the reader routine do?  sleep for some amount of time?  We want to read with as minimal latency as possible so we'd like to avoid unnecessary sleeps.  
   
   With the Consumer interface, we were able to simply call `Receive(ctx)` and it would block until the next message was available.  Is there a way to achieve that while using the Reader for a non-durable subscription?
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] apapia commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
apapia commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-1011550925


   excellent thanks for the explainer @flowchartsman!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-1012543202


   @apapia no problem. If the issue is now resolved, please feel free to close the issue, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] flowchartsman commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
flowchartsman commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-819534635


   To add some context, @frankjkelly and I spoke in the Pulsar Slack, and he mentioned that he misunderstood @codelipenghui's recommendation and that a `Reader` might work for him. Whether there's still a need for a non-durable subscription with a plain consumer, I can't say. If there's effectively no difference between this and a reader, I'd probably say the issue should be closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] frankjkelly commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
frankjkelly commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-971971150


   I think it would be nice to have equivalence with the Java API which supports NonDurable subscriptions on both the Reader and Consumer interfaces 
   
   https://github.com/apache/pulsar/blob/6704f12104219611164aa2bb5bbdfc929613f1bf/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SubscriptionMode.java#L35


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] frankjkelly commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
frankjkelly commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-791453350


   @codelipenghui @wolfstudy wondering if I am misunderstanding something with the Go client - perhaps my `ConsumerOptions `are wrong (see above) or there is a bug in the Broker side counts of open subscriptions but it does appear that the Go client uses Durable subscriptions? Thanks in advance! 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] apapia edited a comment on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
apapia edited a comment on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-1011514612


   @flowchartsman I'm picking this up from @frankjkelly and trying to implement the Reader interface.  However, I'm unsure how to implement it for our use case.  Essentially, we just have a worker than wants to read a topic from the beginning to the "end" where the end is indicated by a message with an end-of-stream property.  If this worker dies, the "job" will just be picked up by another worker which will start reading from the beginning of the stream.  Now we need to handle reading the stream while a publisher is writing to it in real-time and we may also connect to the topic to read before the publisher does.
   
   What I'm unclear on is how to use the `reader.HasNext()` and `reader.Next(ctx)` interface.  Under what conditions will `HasNext()` return false?  Presumably any time there are no unread messages?  If `HasNext()` returns false, what should the reader routine do?  sleep for some amount of time?  We want to read with as minimal latency as possible so we'd like to avoid unnecessary sleeps.  
   
   With the Consumer interface, we were able to simply call `consumer.Receive(ctx)` and it would block until the next message was available.  Is there a way to achieve that while using the Reader for a non-durable subscription?
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar-client-go] codelipenghui commented on issue #468: Add Support for NonDurable subscriptions

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #468:
URL: https://github.com/apache/pulsar-client-go/issues/468#issuecomment-781017795


   @frankjkelly The go client support reader API which is based on the non-durable subscription. Is it works for you?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org