You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2019/09/21 09:11:01 UTC

Slack digest for #general - 2019-09-21

2019-09-20 10:12:53 UTC - Diego Salvi: Hello! I'm looking for a way to set a ttl for unaknowledged (<https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#time-to-live-ttl>) cluster wide to avoid to set it for every namespace created. There is a way? Additionally defaultRetentionTime is in minutes as show from documentation (<https://pulsar.apache.org/docs/en/reference-configuration/#broker-defaultRetentionTimeInMinutes>) or in seconds as seen in ServiceConfiguration (private int ttlDurationDefaultInSeconds = 0)?
----
2019-09-20 10:15:33 UTC - Penghui Li: @Diego Salvi I think you can set in broker.conf
```
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0
```
----
2019-09-20 10:19:26 UTC - Diego Salvi: @Penghui Li I presume that one is to still retain data even if aknowledge, configured as default at broker level and not a namespace level, right? I'm looking for a similar configuration but for unaknowledge data ttl like <https://pulsar.apache.org/docs/en/cookbooks-retention-expiry/#time-to-live-ttl> but still configurable ad broker level. Is it possible?
----
2019-09-20 10:23:57 UTC - Penghui Li: I have check the logic in PersistentTopic:
```
public void checkMessageExpiry() {
        TopicName name = TopicName.get(topic);
        Policies policies;
        try {
            policies = brokerService.pulsar().getConfigurationCache().policiesCache()
                    .get(AdminResource.path(POLICIES, name.getNamespace()))
                    .orElseThrow(() -&gt; new KeeperException.NoNodeException());
            int defaultTTL = brokerService.pulsar().getConfiguration().getTtlDurationDefaultInSeconds();
            int message_ttl_in_seconds = (policies.message_ttl_in_seconds &lt;= 0 &amp;&amp; defaultTTL &gt; 0) ? defaultTTL
                    : policies.message_ttl_in_seconds;
            if (message_ttl_in_seconds != 0) {
                subscriptions.forEach((subName, sub) -&gt; sub.expireMessages(message_ttl_in_seconds));
                replicators.forEach((region, replicator) -&gt; ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies", topic);
            }
        }
    }
```
----
2019-09-20 10:24:51 UTC - Penghui Li: If message ttl of namespace is not set, ttlDurationDefaultInSeconds will be used
----
2019-09-20 10:27:04 UTC - Penghui Li: In broker.conf, if you want to retain data even if acknowledged, the config would be :
```
# Default message retention time
defaultRetentionTimeInMinutes=0

# Default retention size
defaultRetentionSizeInMB=0
```
----
2019-09-20 11:29:34 UTC - Diego Salvi: @Penghui Li Thank you for the answer. I'm just still a little confused. The defaultRetentionTimeInMinutes so is the default value for retention by time at ns level or of message TTL (still ad ns level)? Judging by posted code I presume the second one. Just to be more clear: If i set such configuration.setTtlDurationDefaultInSeconds to 60 second unaknowledged messages will be deleted after 60 seconds?
----
2019-09-20 11:45:44 UTC - Ming Fang: When trying to start an entire cluster consisting of zookeepers, bookkeepers, and pulsar, there seems to be a race condition causing one of the pulsar node to enter a crash loop and unable to start. This is the error
```
10:19:35.098 [main] INFO  org.apache.pulsar.broker.PulsarService - Starting load management service ...                              │
│ 10:19:35.130 [main] ERROR org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Broker znode - [/loadbalance/brokers/pu │
│ lsar-0.pulsar.pulsar-example:8080] is own by different zookeeper-ssession 144156858040267453                                         │
│ 10:19:35.131 [main] ERROR org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - Unable to create znode - [/loadbalance/ │
│ brokers/pulsar-0.pulsar.pulsar-example:8080] for load balance on zookeeper                                                           │
│ org.apache.pulsar.broker.PulsarServerException: Broker-znode owned by different zk-session 144156858040267453 
```
----
2019-09-20 12:00:23 UTC - Penghui Li: @Diego Salvi yes, unack messages will be auto acked. 
----
2019-09-20 12:01:39 UTC - Penghui Li: i think use auto acked here is more suitable
----
2019-09-20 12:04:21 UTC - Nicolas Ha: What is the difference between Consumer close and consumer unsubscribe methods? Its not clear from the javadoc <https://pulsar.apache.org/api/client/>
----
2019-09-20 12:07:53 UTC - Penghui Li: Unsubscribe will delete the subscription, close a consumer just close or delete a consumer of the subscription
+1 : Nicolas Ha
----
2019-09-20 13:06:53 UTC - Junli Antolovich: @Vladimir Shchur appreciated for the info. I  have not worked with F#, and neither do our teams (as far as I know). With this client written in F#, we would have to either learn F# or depend on you guys for any new features we need.
----
2019-09-20 13:15:56 UTC - Vladimir Shchur: I see, but I will be glad to guide anyone of your team if you want to implement a feature. It's not that scary as most people think. Here is the presentation with some implementation details that I had 2 weeks ago.
<https://www.slideshare.net/Odin_cool/fsharp-goodness-for-everyday-work>
+1 : Matteo Merli
----
2019-09-20 13:30:32 UTC - Junli Antolovich: Thanks much for the offer - not that we are not willing to learn a new language, but we are under rather tight time constraint. If time permits for the tasks ahead of us, I would be glad to pick up F#, or GO.
----
2019-09-20 13:35:13 UTC - Tarek Shaar: @David Kjerrumgaard I attached one listener to the consumer
----
2019-09-20 14:07:24 UTC - Tarek Shaar: @David Kjerrumgaard I am also assuming the thread is returned to the pool when the message is acked?
----
2019-09-20 15:40:24 UTC - Diego Salvi: @Penghui Li I attempted to remove unacknowledge messages but with no success:
1) I set ttl to 5 seconds. Wrote with a Produce 1000+ messages (actually 1200)
2) wait 15 seconds just to be safe
3) invoke the pulsarService.getBrokerService().checkMessageExpiry()
4) attempt to read events with a Reader expecting noone but still i find 1200 messages
I attempted to debug the code and I noticed that `subscriptions.forEach((subName, sub) -&gt; sub.expireMessages(message_ttl_in_seconds));` in your posted code is never invoked due to empty "subscriptions". I'm missing something?
----
2019-09-20 15:41:25 UTC - Matteo Merli: @Diego Salvi TTL is enforced in batches for efficiency reasons. There’s a thread that checks TTL every 5 mins
+1 : Penghui Li
----
2019-09-20 16:19:10 UTC - Julien Lechalupé: @Julien Lechalupé has joined the channel
----
2019-09-20 18:14:13 UTC - Nitin Mahadik: @Nitin Mahadik has joined the channel
----
2019-09-20 20:17:44 UTC - Badrul Chowdhury: @Badrul Chowdhury has joined the channel
----
2019-09-20 20:45:10 UTC - Tarek Shaar: I just want to confirm if I want to send and receive short String messages in Java then will need to send and receive byte[] and convert back and forth each time?
----
2019-09-20 20:57:36 UTC - Matteo Merli: You can declare the schema of the topic as `String`.
----
2019-09-20 20:58:22 UTC - Matteo Merli: eg.

```
Producer&lt;String&gt; producer = client.newProducer(Schema.STRING)
     .topic("my-topic")
     .create();
```
----
2019-09-20 22:10:33 UTC - Addison Higham: :thinking_face: hrm... another question, what happens if a bookie starts back up without it's journal? Toying with the idea of using ephemeral storage (specifically a k8s local persistent volume) for the journal FS and wondering what that would mean in terms of recovery of a lost node. So the ledger storage will still be there, just the journal missing.
----
2019-09-20 22:11:59 UTC - Matteo Merli: Current behavior is the bookie will fail to startup. There’s a “cookie” validation for the bookie identity that has to match against what’s in ZK, on both the journal and the ledger storage directories.
----
2019-09-20 22:13:11 UTC - Matteo Merli: There was a proposal by @Rajan Dhabalia to allow for Journal bypass directly. That would be a better option when the durability is not required
----
2019-09-20 22:15:13 UTC - Addison Higham: okay, so there isn't real handling of that case by BK, so it probably isn't advised to say... take that cookie value and just copy it to the journal storage and restart? it seems like that would obviously invalidate any segments it had that were open, but for any closed segments...
----
2019-09-20 22:17:52 UTC - Matteo Merli: sure, you could automate that
----
2019-09-20 22:18:14 UTC - Matteo Merli: the downside is just that what was in journal might got lost
----
2019-09-20 22:21:51 UTC - Addison Higham: but assuming a Qs of 3, the other members would still have those messages, I am just wondering if that inconsistency would be noticed and fixed
----
2019-09-20 22:22:47 UTC - Addison Higham: like do bookies checksum closed segments and then repair any segments that don't match the quorum?
----
2019-09-20 22:25:06 UTC - Matteo Merli: yes, the client might get EntryNotFound from one bookie, and will retry to read from the other 2 bookies
----
2019-09-20 22:25:26 UTC - Matteo Merli: that also happens if one entry is corrupted on one bookie
----
2019-09-20 22:31:03 UTC - Rajiv Abraham: Hi, for the file io source connector, for the `inputDirectory` variable, I’m guessing the file path is local to the worker node? If so, is there a way of using the REST api to upload a file. If there are examples online on how to do it, that would be great.
----
2019-09-20 23:30:54 UTC - Rajiv Abraham: Do I have to publish the file out of the Pulsar API(e.g through scp) and how does it work for multiple nodes in the cluster?
----
2019-09-21 00:14:14 UTC - Rajiv Abraham: Hi, another question, is there a python client to create a source/sink? or do I have to call the REST API? I looked at the Python API and didn’t see it but just wanted to double check.
----