You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/02/19 09:11:03 UTC

Slack digest for #general - 2020-02-19

2020-02-18 09:24:57 UTC - Rahul: @Rahul has joined the channel
----
2020-02-18 09:58:56 UTC - Lars Norén: @Lars Norén has joined the channel
----
2020-02-18 11:15:48 UTC - Rolf Arne Corneliussen: @Rolf Arne Corneliussen has joined the channel
----
2020-02-18 11:52:23 UTC - Rolf Arne Corneliussen: Hello everyone,

I am new to Pulsar, and I am interested to see how it can solve a specific use case - *detection of*
*the absence of messages* from different sources. Let us say we have a substantial number of devices
that emits heartbeat messages at certain intervals, translated into a stream of heartbeat messages,
and we want to detect missing heartbeats. The devices may have individual heartbeat intervals.

Using _Kafka and Kafka Streams_, the problem could be solved using key value stores and setting up
_timer wheels_ for the devices belonging to a partition - in memory. However, this requires iterating
over the complete key value store when a partition is assigned to a consumer.

I have been looking at the Pulsar Functions API, and while you can get and put table entries, I could
not find any way to get all entries (nor the approximate size of a table). From the table service
API in BookKeeper, I can see there are methods to get a range, that is, iterate over a table given
the partition key. Have I missed something, or are the plans to _expose *ranges*_ to the Functions API?

Alternatively, maybe I could try to write a micro service that uses a Pulsar client to consume from a topic containing heartbeats,
in combination with using a BookKeeper client to access the table service? Would that be a viable solution? Or would it be an anti-pattern (to access Pulsar and BookKeeper in the same application)?
Or will the 'transactional' aspects be to complicated?

Any ideas or input are very welcome!
----
2020-02-18 16:51:15 UTC - Sree Vaddi: you all are cordially invited to our first meetup at a new company and this year’s first meetup in SF:
raffle prizes
<https://www.meetup.com/Apache-Heron-Bay-Area/events/nglzdrybcdbwb/>
----
2020-02-18 16:52:28 UTC - Sijie Guo: Currently the ranges API is not exposed to the functions yet. Feel free to create a GitHub issue for requesting this feature.

I don’t think this requires a table or a persistent key/value store to do so. You can just maintain an in memory map since heartbeats are bound to time. So you just need to keep a time window for it, no?
----
2020-02-18 16:53:27 UTC - Sijie Guo: What command are you using?
----
2020-02-18 17:01:06 UTC - Atif: Thanks for responding sijieg!
----
2020-02-18 17:02:46 UTC - Atif: I could see we have the company banners listed in the documentation but would like to understand more about what use cases and what volume of data/ business slas/ feature sets these companies are using
----
2020-02-18 17:09:52 UTC - Sijie Guo: @Atif for the use cases, you can check - <https://streamnative.io/success-stories/>

for “volume of data/ business slas/ feature sets”, I don’t think most of the companies will shall it publicly, especially about “volume of data” and “business slas”.
+1 : Atif
----
2020-02-18 17:10:07 UTC - Atif: I can't talk a lot unfortunately about our use case, but we're looking to use pulsar at a very large scale 1000-2000 namespaces and topics. As of now we dont have a huge volume of data but there may be spikes in volume and we want to be able to scale up and down dynamically. We personally like Pulsar a lot and are inclined to using it but the team would like to hear success stories if theyre there
----
2020-02-18 17:10:21 UTC - Atif: thanks
----
2020-02-18 17:10:46 UTC - Atif: this should do!
----
2020-02-18 17:12:34 UTC - Sijie Guo: &gt;  a very large scale 1000-2000 namespaces and topics.
that’s not a lot. I know a lot of the companies running higher than this scale. The data volume is also higher.

The public available numbers that I have seen:

• Yahoo! - millions of topics/partitions, 100 billions of messages / day
• Yahoo! JAPAN - 100 billions of messages / day
• Tencent - 10 billions of messages per day for billing platform.
ok_hand : Atif
----
2020-02-18 17:15:10 UTC - Atif: thanks! this is extremely positive to hear
----
2020-02-18 17:25:45 UTC - Cody Poll: @Cody Poll has joined the channel
+1 : David Kjerrumgaard
----
2020-02-18 17:47:07 UTC - Sijie Guo: No problems. Happy to help! Feel free to reach out if you need anything more 
----
2020-02-18 17:58:49 UTC - Cody Poll: Good day :wave:

tl;dr - Does creating/deleting a subscription require writes to zookeeper?
---
I'm trying to understand the ways that Pulsar makes use of Zookeeper. My understanding from the documentation is that zookeeper is used for:

1. Information that must be globally consistent (namespaces, tenants, etc.) goes in the instance-level zookeeper
2. Cluster-level metadata (broker ownership of topics, load reports, and bookie management)
In the "Understanding How Apache Pulsar Works" blog post (<https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works>), it says:
&gt; Each subscription stores a cursor. The cursor is the current offset in the log. Subscriptions store their cursor in BookKeeper in ledgers. This makes cursor tracking scalable just like topics.
It sounds like there are a number of ways this could be implemented. My first reaction is to think that there is some log of cursor updates, meaning creating/deleting a subscription wouldn't require metadata writes in Zookeeper, only a lookup for the bookies to send it to. Is this correct?
----
2020-02-18 18:10:42 UTC - Addison Higham: @Cody Poll <https://streaml.io/blog/cursors-in-pulsar> &lt;- walks through it
----
2020-02-18 18:12:58 UTC - Cody Poll: Thank you :bow:
----
2020-02-18 18:30:28 UTC - Cody Poll: OK - that makes sense.

1. For each subscription, a ledger is created to hold its current offset
2. Ledger location and replication metadata is managed in zookeeper
So creating/deleting a subscription will have metadata effects in zookeeper. :thumbsup_all:
+1 : Sijie Guo
----
2020-02-18 20:12:28 UTC - Mikhail Veygman: Hi.
wave : Cody Poll
----
2020-02-18 20:28:19 UTC - Mikhail Veygman: Sorry.  Got stuck.
----
2020-02-18 20:28:41 UTC - Mikhail Veygman: I am doing an async send to a topic but I am getting this:

Feb 18, 2020 3:22:55 PM <http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer reportTooManyInstances
SEVERE: You are creating too many HashedWheelTimer instances. HashedWheelTimer is a shared resource that must be reused across the JVM,so that only a few instances are created.
----
2020-02-18 20:29:31 UTC - Mikhail Veygman: Is this because I am checking whether or not the Future is done?
----
2020-02-18 20:35:26 UTC - Sijie Guo: Are you creating too many clients?
----
2020-02-18 20:35:42 UTC - Mikhail Veygman: Quite possibly
----
2020-02-18 20:35:52 UTC - Mikhail Veygman: Let me try without it.
----
2020-02-18 21:29:29 UTC - Rolf Arne Corneliussen: Thanks for your input,

Interesting thoughts, which led me to read the 'Event Processing Design Patterns with Pulsar Functions' blog post on <http://streaml.io|streaml.io> website, and also look at some Window functions in the examples on github. However, I am not sure this will provide a solution for this specific use case.

So I will try to be more specific about the requirements:
• The input topic contains records that identify the device and the heartbeat timestamp
• Each device has an individual configured heartbeat timeout interval, which could be anywhere between 10 seconds and 24 hours.
• Each device may emit a heartbeat when it wants to, so it may for example produce two heartbeats within the timeout window.
• The number of devices may be 1M  +/-, the total number of heartbeats per second could be 10K+
• The function needs to detect missed heartbeats, and produce a message for that on an output topic. It should also detect missed heartbeats within one second. After detecting a missed heartbeat, the device status is set to 'dead' and we should not detect missed heartbeats for dead devices. If we receive a heartbeat for a dead device, we should produce a message on an output topic, and expect regular heartbeats.
• The function needs to work correctly after a machine/function instance crash, or spinning up a new instance of the function
My idea is to store, for each device, the timestamp of the last heartbeat, the device status (dead/alive) and the timestamp the device must send the next heartbeat (if alive). This is done both in memory and in some persistent way, so the process could be resumed in another process, on the same host or another host.

Using Kafka Streams, we can build the in memory data structure in the life cycle method callbacks (i.e. `init()` when a partition is assigned) by iterating over the key value store, and this is very efficient when the store is persisted with RocksDB. The structure is maintained by processing
each inbound message, one at the time. Devices that are timed out are detected in a _punctuation_ (a scheduled task).

In principle, if the heartbeat topic were compacted, we could create the in memory structures by reading the topic from the beginning when a function is started, but this would take some time, besides I do not know how to initiate that in Pulsar functions.

If the timeout for each device were the same, for example one minute, and if we could get a list of device from another source, I can see that one could use a time window of one minute to decide which devices had emitted heartbeats and which devices where dead (albeit the size of the collection argument to the `process` method invocation would be huge?). However, given the above
requirements, it is not clear to me how use a time window?

If access to table ranges were to be implemented for functions, I am not sure how to leverage that in an initialization method. Would one have to maintain a `boolean` in the
`process` method for example? Could the Function interface be extended with life cycle methods?

Also, would it be a good idea to try writing a micro-service that has a Pulsar client (to consume heartbeat topic) and a BookKeeper client to access a state store?
----
2020-02-18 22:31:41 UTC - Tanner Nilsson: @Tanner Nilsson has joined the channel
----
2020-02-18 22:50:32 UTC - Kirill Podkov: @Kirill Podkov has joined the channel
----
2020-02-19 01:08:45 UTC - Sérgio Silveira: @Sérgio Silveira has joined the channel
----
2020-02-19 02:55:56 UTC - Rattanjot Singh: @Rattanjot Singh has joined the channel
----
2020-02-19 03:00:40 UTC - Billy Yuan: @Billy Yuan has joined the channel
----
2020-02-19 03:01:15 UTC - Rattanjot Singh: Getting this error when trying to produce a message in pulsar broker

Python Code
```import pulsar

# Create a Pulsar client instance. The instance can be shared across multiple
# producers and consumers
client = pulsar.Client('<pulsar://localhost:6650>')

# Create a producer on the topic. If the topic doesn't exist
# it will be automatically created
producer = client.create_producer(
                '<persistent://sample/standalone/ns1/my-topic>')

for i in range(10):
    content = 'hello-pulsar-%d' % i
    # Publish a message and wait until it is persisted
    producer.send(content)
    print('Published message: "%s"' % content)

client.close()```
ERROR
```2020-02-19 08:23:48.021 INFO  ConnectionPool:72 | Created connection for <pulsar://localhost:6650>
2020-02-19 08:23:48.024 ERROR ClientConnection:374 | [&lt;none&gt; -&gt; <pulsar://localhost:6650>] Failed to establish connection: Connection refused
2020-02-19 08:23:48.024 INFO  ClientConnection:1337 | [&lt;none&gt; -&gt; <pulsar://localhost:6650>] Connection closed
2020-02-19 08:23:48.024 ERROR ClientImpl:182 | Error Checking/Getting Partition Metadata while creating producer on <persistent://sample/standalone/ns1/my-topic> -- 5
2020-02-19 08:23:48.024 INFO  ClientConnection:229 | [&lt;none&gt; -&gt; <pulsar://localhost:6650>] Destroyed connection
Traceback (most recent call last):
  File "produce_message.py", line 10, in &lt;module&gt;
    '<persistent://sample/standalone/ns1/my-topic').enableTls(true)>
  File "/usr/local/lib/python3.7/site-packages/pulsar/__init__.py", line 476, in create_producer
    p._producer = self._client.create_producer(topic, conf)
Exception: Pulsar error: ConnectError```

----
2020-02-19 05:46:35 UTC - Alexander Ursu: what do the `triage/week-X` issue labels mean on the apache/pulsar github?
----
2020-02-19 05:50:04 UTC - Manju Priya A R: What is maximum acknowledgment timeout that can be set for Consumer? Tried setting the max possible long value "9223372036854775807" and am observing the below exception:   11:18:29.613 [pulsar-timer-4-1] WARN <http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer - An exception was thrown by TimerTask.
java.util.NoSuchElementException: null
	at java.util.ArrayDeque.removeFirst(ArrayDeque.java:285)
	at org.apache.pulsar.client.impl.UnAckedMessageTracker$2.run(UnAckedMessageTracker.java:130)
	at <http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:680)
	at <http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:755)
	at <http://org.apache.pulsar.shade.io|org.apache.pulsar.shade.io>.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:483)
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
----
2020-02-19 05:57:11 UTC - Sijie Guo: the label is used for triaging issues. so we know there are people looking into this issues at some extends
----
2020-02-19 06:19:52 UTC - Gautam: @Gautam has joined the channel
----
2020-02-19 06:24:45 UTC - Ken Huang: @Ken Huang has joined the channel
----
2020-02-19 06:57:14 UTC - Ken Huang: Hi all, I want to enable TLS in pulsar, I reference this <http://pulsar.apache.org/docs/en/security-tls-transport/>
```client = Client("<pulsar+ssl://localhost:6651/>", 
                tls_trust_certs_file_path="/pulsar/my-ca/certs/ca.cert.pem", 
                tls_allow_insecure_connection=False) 
producer = client.create_producer('<persistent://public/default/topic>', schema=StringSchema()) ```
I will get the Exception: Pulsar error: ConnectError
I use docker to run pulsar
```docker run -d -it -p 6650:6650 -p 8080:8080 -p 6651:6651 -p 8081:8081 -p 8443:8443 -v D:/Documents/Docker/pulsar_manager/data:/pulsar/data --name pulsar-manager-standalone1 pulsar bin/pulsar standalone ```
How can I do, thanks a lot.
----
2020-02-19 08:02:04 UTC - Ismail Hassan: @Ismail Hassan has joined the channel
----
2020-02-19 08:39:53 UTC - Manuel Mueller: @Manuel Mueller has joined the channel
----