You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/06/06 09:11:05 UTC

Slack digest for #general - 2020-06-06

2020-06-05 10:31:48 UTC - Konstantinos Papalias: are you running your code in a loop? e.g. <http://pulsar.apache.org/docs/en/client-libraries-java/#consumer>
```while (true) {
  // Wait for a message
  Message msg = consumer.receive();```
----
2020-06-05 10:46:51 UTC - Damien Burke: @Damien Burke has joined the channel
----
2020-06-05 11:08:14 UTC - Oliver Fenton: @Oliver Fenton has joined the channel
----
2020-06-05 11:15:23 UTC - Damien Burke: Hi, I am currently designing a pipeline using pulsar functions and connectors. Its straight forward use case, and involves processing streaming logs files and persisting some associated results. As a general question, is a sink always a "terminal operation"? The background to my question is I wish to persist in 2 data stores - first update the SoR database and then write that data to a cache. 1 option that would work, would be from the (SoR) sink.write method, I could invoke pulsar-client to publish to a "cache persist request topic". Does that seem a bad idea? I'm also trying to understand is there any hook i could code against when record.ack fires (i.e. when record.ack fires could it trigger another function/sink?). Any advice appreciated. Thanks
----
2020-06-05 11:33:44 UTC - Daniel Liu: @Daniel Liu has joined the channel
----
2020-06-05 11:48:40 UTC - Meyappan Ramasamy: hi team, we were planning to use apache pulsar as unified solution for both event bus and event store with below method
1. publish events to a dynamic topic created for an aggregate instance (aggregate can be a shopping-cart, product, order etc) 
2. topic subscription based on a regex pattern to subscribe to all topics of interest under same namespace. 
3. use pulsar reader API to read events from a topic created per aggregate instance to dehydrate events and restore aggregate state 
the issue was once we subscribe to a topic list by specifying a regex pattern and start the consumer, and later publish events to new dynamic topics (under same namespace) we find the new dynamic topics were not getting subscribed by the consumer, so the dynamic topic subscription feature was not fully supported in pulsar for consumers to subscribe to new dynamic topics.
we were only able to subscribe to a static list of topics which exists already and create message listeners for received events. please let me know if there is plan for dynamic topic subscription in next version of pulsar. hope my issue and question makes sense.
----
2020-06-05 11:51:08 UTC - Ildefonso Junquero: Have you taken a look at multi-topic subscriptions? <https://pulsar.apache.org/docs/en/2.5.2/concepts-messaging/#multi-topic-subscriptions>
----
2020-06-05 11:52:00 UTC - Ildefonso Junquero: You can define a list of topics in two ways:
&gt; *On the basis of a <https://en.wikipedia.org/wiki/Regular_expression|regular expression> (regex), for example `<persistent://public/default/finance-.*>`*
• By explicitly defining a list of topics

----
2020-06-05 11:55:00 UTC - Meyappan Ramasamy: hello team, yes we have tried this regex approach, the issue we encountered is we were only able to subscribe to the regex matching topics if it already existed in pulsar. if new dynamic topics were getting created after the consumer subscribes to topic based on regex pattern, the consumer is unable to subscribe to new topics created later after subscription
----
2020-06-05 11:55:27 UTC - Meyappan Ramasamy: so i believe pulsar is not currently supporting dynamic topic subscription currently
----
2020-06-05 11:55:51 UTC - Meyappan Ramasamy: i think it was a feature being planned for the next version of pulsar
----
2020-06-05 11:56:04 UTC - Meyappan Ramasamy: so wanted to connect with pulsar development team to know
----
2020-06-05 11:56:25 UTC - Meyappan Ramasamy: i can share my pulsar code changes with the team to discuss further if required
----
2020-06-05 11:59:52 UTC - Rattanjot Singh: yes like this only
----
2020-06-05 12:03:43 UTC - Ebere Abanonu: Dynamic consumer will be the responsibility of the client library. The server cannot create consumer for client library. You have to find a way to create a new consumer for the new topic.
----
2020-06-05 12:07:12 UTC - Ebere Abanonu: When producing, you just want the client to create a consumer. It could be doable if there is a way to subscribe to topic created events that is if such feature exist. All the client need to do is listen to such event and then create a consumer for you cc: @Sijie Guo
----
2020-06-05 12:10:37 UTC - Ebere Abanonu: Another option is to use the admin api to pull at regular interval for topics then create new consumer for new topics
----
2020-06-05 12:22:44 UTC - Meyappan Ramasamy: here is my use case
1. subscribe consumer to a regex pattern of "aggregate_type-*" , for example something like <persistent://public/default/shoppingcart-.*>
2. every time we have a new event stream for an aggregate, publish event to a new topic of name "aggregate_type-stream_id"
3. but the new topics are not getting subscribed by the consumer , if topic existed already , it will get subscribed and message listener will receive events and we can write event handlers  
----
2020-06-05 12:25:44 UTC - Ebere Abanonu: Yeah I understand your use case, that is why i suggested those two approach......if topic does not already exist at the time of subscribing then you need to listen for new topics like I said above
----
2020-06-05 12:28:48 UTC - Ebere Abanonu: What the client does with regex is scan for topics under the same tenant and namespace that start with, in your case, <persistent://public/default/shoppingcart-.*|shoppingcart-. >
----
2020-06-05 12:29:30 UTC - Ebere Abanonu: It  only picks up existing topics and it does not listen for future topics
----
2020-06-05 12:41:26 UTC - Meyappan Ramasamy: yes i am able to follow , just to reinforce the understanding I did below
1. subscribe of topics with regex pattern <persistent://public/default/shoppingcart-.*>
2. create a random event stream ID using an API UUID.randomUUID() for a new aggregate entity instance and publish events for new instance using a dynamic topic of the format "shoppingcart-UUID1" , "shoppingcart-UUID2", shoppingcart-UUID3" etc 
3. now if it future dynamic topic , we will miss events, but if consumer has luckily subscribed to a topic after an event was published to this topic, then it will listen and receive events from the topic with a message listener callback function.
4. but listening for new topics and subscribe can add some delay if we need to do this outside the pulsar library, we need real-time dynamic topic subscription 
----
2020-06-05 12:44:46 UTC - Ildefonso Junquero: Yo can always have a topic to "inform" about new topics creation so you don't need to pull periodically, but you'll have to implement the logic in your client and in the producer that creates the new topic.
heavy_plus_sign : Kirill Merkushev
----
2020-06-05 12:44:56 UTC - Meyappan Ramasamy: i think the idea for subscribing to topics based on regex pattern is to enable dynamic topic subscription for future topics which is going to match this regex , let me know if there is plan for supporting dynamic topic subscription in future release version of pulsar, or what is the best way to handle this if we need real-time dynamic topic subscription without any delay. if we are going to poll at regular intervals for new topics matching the regex, then this does not look like a even driven approach with real time performance , let me know
----
2020-06-05 12:45:58 UTC - Ebere Abanonu: If you knew the topic a head of time, you could already create subscription for them.
heavy_plus_sign : Kirill Merkushev
----
2020-06-05 12:46:08 UTC - Ildefonso Junquero: Regarding your question about dynamic topic subscription in future release, I have no idea.
----
2020-06-05 12:47:04 UTC - Ebere Abanonu: You wont loss events as long as the message is yet to be consumed and message retention is high enough
----
2020-06-05 12:48:07 UTC - Meyappan Ramasamy: 1. i have a producer which is going to publish events to topics of the form "shoppingcart_ID1", "shoppingcart_ID2", "shoppingcart_ID3" 
2. I believe the consumer will be started before the producer and subscribe to a regex pattern of the form "<persistent://public/default/shoppingcart->."
3. there will be a message listener callback at the consumer to receive the published events and invoke corresponding event handlers 
4. what can be the best way to handle new events which are getting published to pulsar event bus in this way 
----
2020-06-05 12:48:52 UTC - Meyappan Ramasamy: with current pulsar library, i am unable to dynamically subscribe to new topics and receive events published to dynamic topics
----
2020-06-05 12:50:00 UTC - Meyappan Ramasamy: i tried to use the Reader API to read events from a dynamic topic and was able to fetch all the events with unlimited time retention policy configured
----
2020-06-05 12:50:29 UTC - Meyappan Ramasamy: the only new requirement is for dynamic topic subscription under a given namespace which matches the regex pattern
----
2020-06-05 12:50:42 UTC - Meyappan Ramasamy: hope the requirement is clear
----
2020-06-05 12:52:25 UTC - Meyappan Ramasamy: please let me know who to reach out to know if i can get a solution using pulsar for dynamic topic subscription
----
2020-06-05 12:59:08 UTC - Meyappan Ramasamy: please inform the best way to handle dynamic topics for my use case , should i just use admin API to be informed about new topic creation and then have the existing consumer subscribe to these new topics ? if there is any example code snippet documenting the same, please let me know . thanks
----
2020-06-05 16:06:19 UTC - Ankush: any suggestions guys? Is this necessary, can we improve this?
----
2020-06-05 16:43:58 UTC - slouie: Hi,
I have a question about an issue I saw with a namespace storage (`pulsar_storage_size`) filling up and hitting a ceiling (~21GB), blocking producers. I’m running version `2.3.1` and have the default backup policy with size `10G` and policy `producer_request_hold`. The retention policy on the namespace is `defaultRetentionTimeInMinutes: 0` and `defaultRetentionSizeInMB: 0`. I suspect the garbage collector got into an odd state, even though I couldn’t find logs to indicate this. I can confirm the backlog was not growing, so it appears to me that acknowledged messages were not being GC’d. I was able to get messages flowing again by clearing the backlog.

I’m failing to reproduce this issue and could not find anything in the docs or github that seems related. Any ideas or thoughts on where to look and learn more. (I know upgrading may alleviate and provide greater visibility with new metrics and features)
----
2020-06-05 16:44:34 UTC - Frank Kelly: Newbie Authorization Question - I wanted to test out the default Pulsar Authorization Plugin - so in my default `standalone.conf` I set `authorizationEnabled=true` without making any other changes (e.g. for authentication or roles etc.) but I get the following
```12:38:37.703 [pulsar-io-50-7] WARN  org.apache.pulsar.broker.web.PulsarWebResource - [<persistent://public/functions/assignments>] Role null is not allowed to lookup topic
12:38:37.703 [pulsar-io-50-7] WARN  org.apache.pulsar.broker.lookup.TopicLookupBase - Failed to authorized null on cluster <persistent://public/functions/assignments>
12:38:37.703 [pulsar-client-io-78-1] WARN  org.apache.pulsar.client.impl.BinaryProtoLookupService - [<persistent://public/functions/assignments>] failed to send lookup request : org.apache.pulsar.client.api.PulsarClientException$AuthorizationException: Don't have permission to connect to this namespace```
As a result Pulsar cannot start after 5 retries and it dies.
Any ideas what I can do to address this?
----
2020-06-05 16:51:29 UTC - Tamer: You can have an outout topic of the pulsar function that writes to the SoR database when it succeed. Then the cache function can listen to this topic and update cache.

This will basically implement what you described as record.ack scenario
----
2020-06-05 18:36:12 UTC - Alan Broddle: I have a very basic question about infrastructure level of Pulsar:

We are just starting to use Pulsar and have to date tried out Pulsar in standalone, on-Prem VMWare VMs, and AWS EC2 instances using terraform.  So far the process has been ‘mostly’ manageable and we have working environments in all these spaces.

In our research, we have looked at the Pulsar on Kubernetes documentation and are considering testing this approach with AWS EKS to remove the Kubernetes service management requirements.  In our first attempts using the provided Helm code, we are seeing a lot of issues with even getting this environment to come up, let alone with proper configuration and running instances of the Pulsar stack.

Question:  Given that Pulsar is already a complex mix of Stateless and Stateful components, is the Kubernetes environment something this group would ‘Highly Recommend’ and why?

Our initial tests are not showing that there is a positive cost/benefit in comparison to a AWS EC2 based implementation. (edited)
----
2020-06-05 18:40:22 UTC - Tanner Nilsson: One of the positives for running pulsar in EKS for us is that our entire application stack runs in EKS, so with pulsar running in the same cluster it makes networking/connectivity very simple. We use the helm chart from <https://github.com/kafkaesque-io/pulsar-helm-chart> . We have also been using support from kafkaesque and they are great!
----
2020-06-05 22:15:02 UTC - Maxime Blanc: @Maxime Blanc has joined the channel
----
2020-06-06 00:58:49 UTC - Nicolas Ha: I am also looking for the function docs - the link above seems broken?
I get a 404 for <http://pulsar.apache.org/swagger/restApiVersions.json>

In particular I am looking for a curl POST to send a Java function
----
2020-06-06 04:15:34 UTC - Rakesh Arumalla: @Rakesh Arumalla has joined the channel
----
2020-06-06 08:08:50 UTC - Liam Clarke: Yeah, generally if you want to implement semantic based deduplication, you'd want to have a Flink / Spark / etc. app in your pipeline implementing it - the deduplication offered by Pulsar / Kafka etc. is to ensure effectively-once semantics on the part of the message broker.
----
2020-06-06 08:10:37 UTC - Liam Clarke: Hi all,

I'm using Pulsar standalone to poke about tiered storage, and have hit an interesting issue. In my `standalone.conf`  I've configured it as such:

```managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadRegion=ap-southeast-2
s3ManagedLedgerOffloadBucket=test```
But when I start the broker I get the following exception twice

```08:20:01.790 [pulsar-ordered-OrderedExecutor-7-0] INFO org.apache.bookkeeper.mledger.offload.OffloaderUtils - Found and loaded 2 offloaders 
08:20:01.795 [pulsar-ordered-OrderedExecutor-7-0] ERROR org.apache.pulsar.broker.PulsarService - create ledgerOffloader failed for namespace public/functions 
org.apache.pulsar.broker.PulsarServerException: org.apache.pulsar.broker.PulsarServerException: Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled 
       at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:836) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] 
       at org.apache.pulsar.broker.PulsarService.lambda$getManagedLedgerOffloader$4(PulsarService.java:800) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] 
       at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) ~[?:1.8.0_252] 
       at org.apache.pulsar.broker.PulsarService.getManagedLedgerOffloader(PulsarService.java:792) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] 
       at org.apache.pulsar.broker.service.BrokerService.lambda$getManagedLedgerConfig$33(BrokerService.java:1065) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] 
       at org.apache.bookkeeper.mledger.util.SafeRun$2.safeRun(SafeRun.java:49) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2] 
       at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0] 
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252] 
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252] 
       at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final] 
       at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252] 
Caused by: org.apache.pulsar.broker.PulsarServerException: Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled 
       at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:829) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2] 
    ... 10 more   ```
It's obviously picked up the `managedLedgerOffloadDriver=aws-s3` setting, but not the next one? Or is this just a false positive I can ignore? I note it's for public/functions only, am about to test it for a topic with a manual offload.
----
2020-06-06 08:16:32 UTC - Damien Burke: Cool, TY
----
2020-06-06 08:56:35 UTC - Liam Clarke: Okay, so I tested with a topic, couple of observations - 1) when I trigger offload, I have to specify it per partition for a partitioned topic?

```./pulsar-admin topics create-partitioned-topic <persistent://test-tenant/test-namespace/example-topic> --partitions 1
./pulsar-admin topics offload --size-threshold 1K test-tenant/test-namespace/example-topic
Topic not found

Reason: Topic not found

./pulsar-admin topics list test-tenant/test-namespace
"<persistent://test-tenant/test-namespace/example-topic-partition-0>"

./pulsar-admin topics offload --size-threshold 1K <persistent://test-tenant/test-namespace/example-topic-partition-0>
Offload triggered for <persistent://test-tenant/test-namespace/example-topic-partition-0> for messages before 363:0:-1```
The broker then throws the same exception:

```08:51:15.797 [pulsar-ordered-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.PulsarService - create ledgerOffloader failed for namespace test-tenant/test-namespace
org.apache.pulsar.broker.PulsarServerException: org.apache.pulsar.broker.PulsarServerException: Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled
        at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:836) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at org.apache.pulsar.broker.PulsarService.lambda$getManagedLedgerOffloader$4(PulsarService.java:800) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1877) ~[?:1.8.0_252]
        at org.apache.pulsar.broker.PulsarService.getManagedLedgerOffloader(PulsarService.java:792) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at org.apache.pulsar.broker.service.BrokerService.lambda$getManagedLedgerConfig$33(BrokerService.java:1065) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        at org.apache.bookkeeper.mledger.util.SafeRun$2.safeRun(SafeRun.java:49) [org.apache.pulsar-managed-ledger-2.5.2.jar:2.5.2]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_252]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_252]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
Caused by: org.apache.pulsar.broker.PulsarServerException: Either s3ManagedLedgerOffloadRegion or s3ManagedLedgerOffloadServiceEndpoint must be set if s3 offload enabled
        at org.apache.pulsar.broker.PulsarService.createManagedLedgerOffloader(PulsarService.java:829) ~[org.apache.pulsar-pulsar-broker-2.5.2.jar:2.5.2]
        ... 10 more```
However, my second observation is this:

```./pulsar-admin topics offload-status <persistent://test-tenant/test-namespace/example-topic-partition-0>
Offload was a success```
When it clearly wasn't.
----