You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2020/08/02 09:11:03 UTC

Slack digest for #general - 2020-08-02

2020-08-01 18:19:47 UTC - Adriaan de Haan: wouldn't ackTimeout then result in re-processing of already processed messages?
----
2020-08-01 18:21:02 UTC - Adriaan de Haan: or would the client recognize that it's an already ackéd the message and ignore it the second time around?
----
2020-08-01 18:21:37 UTC - Adriaan de Haan: or does that mean I need to keep track of the past few minutes worth of messages myself and prevent re-processing already processed messages?
----
2020-08-01 19:15:34 UTC - Allen ONeill: Hi all - has anyone got a link to documentation on "Metrics for Pulsar Functions" ... the current docs are empty on the subject.
I am wondering if they would be good for storing counts of different types of message my pulsar function examines in its stream so I can batch these and send them off as an aggregate message every few seconds to another topic....
----
2020-08-02 01:33:55 UTC - Axel Sirota: Hi All! How are you? Sorry to bother, I am trying to show Pulsar in a class but I am having trouble with a dummy example. I have a standalone deployment and I wanted a simple routing function based on some key (btw please update the docs that still say `context.publish`  is valid in the Java API :slightly_smiling_face: ) and do some averages. However when fetching the state it is always “empty/null”.

Any help would be super welcome!

This is with Pulsar 2.6.0 in MacOS on a standalone server (in Docker deployment I couldn’t create the function at all since the stateStorageService was not exposed :confused:) Thanks!!
----
2020-08-02 01:40:20 UTC - Axel Sirota: My code to fetch the context:

```ByteBuffer old_state = context.getState(key);
old_average = StandardCharsets.UTF_8.decode(old_state).toString();```
To set:

```context.putState(key, ByteBuffer.wrap(new_average.getBytes(charset)));```

----
2020-08-02 01:46:29 UTC - Jerry Peng: &gt; btw please update the docs that still say `context.publish`  is valid in the Java API :slightly_smiling_face:
Thanks for point that out!  Feel free to also file an PR to fix to docs.  Contributions welcome :slightly_smiling_face:

&gt; This is with Pulsar 2.6.0 in MacOS on a standalone server (in Docker deployment I couldn’t create the function at all since the stateStorageService was not exposed :confused:) Thanks!!
You might need to configure docker to expose the state storage service.  Have you tried to run pulsar standalone not in a container and directly on your Mac?  That should work.
----
2020-08-02 01:53:41 UTC - Axel Sirota: I am running standalone in my Mac :slightly_smiling_face: Docker was another ste of troubles hahaha
----
2020-08-02 01:53:58 UTC - Axel Sirota: I will send the PR for the docs later :slightly_smiling_face: but yeah!
----
2020-08-02 02:03:01 UTC - Axel Sirota: btw, this is both in `localrun` deploy as well as `create` deploy
----
2020-08-02 02:05:34 UTC - Jerry Peng: Just run standalone pulsar and function that uses state on my MAC and everything worked.  I suspect that some network settings maybe be causing this issue for you.  The state service is exposed on 127.0.0.1:4181.  Can you check /etc/hosts to see if the loop back is configured correctly?
----
2020-08-02 02:07:09 UTC - Jerry Peng: How are you starting the pulsar standalone?
----
2020-08-02 02:10:04 UTC - Axel Sirota: ```cat /etc/hosts
##
# Host Database
#
# localhost is used to configure the loopback interface
# when the system is booting.  Do not change this entry.
##
127.0.0.1	localhost
255.255.255.255	broadcasthost
::1             localhost
# Added by Docker Desktop
# To allow the same kube context to work on the host and the container:
127.0.0.1 kubernetes.docker.internal
# End of section```
(seems fine, right?)

I am running as `bin/pulsar standalone`

for my function indeed in locarun i state  `--state-storage-service-url "<bk://localhost:4181>"`  (which I will also add a PR to the docs, since I found this on a thread here :joy:)
----
2020-08-02 02:14:10 UTC - Jerry Peng: Unfortunately that page is no written yet.  If you would like to batch messages consider using a window function.  Batching messages by time is already implemented there.

Here is an example of a window function and its function config config:

<https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AddWindowFunction.java>


<https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/resources/example-window-function-config.yaml>
----
2020-08-02 02:17:16 UTC - Jerry Peng: Are there an errors in the standalone logs?  Can you try to do

```telnet 127.0.0.1 4181```
to see if the service is reachable?
----
2020-08-02 02:18:23 UTC - Axel Sirota: I think you hit it!

```➜  ~ telnet 127.0.0.1 4181
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
� (hangs)```

----
2020-08-02 02:18:44 UTC - Axel Sirota: so the bookie is not starting the state service?
----
2020-08-02 02:19:05 UTC - Axel Sirota: what type of logs should i look for?
----
2020-08-02 02:19:09 UTC - Jerry Peng: FYI there were some problems with using localrun with the state service.  I am not sure if they were fixed in 2.6.  I would suggest deploying the function using create
----
2020-08-02 02:19:18 UTC - Jerry Peng: That means it connected successfully
----
2020-08-02 02:19:33 UTC - Jerry Peng: at least the port is open
----
2020-08-02 02:20:26 UTC - Axel Sirota: ahh ok haha my infra is terrible
----
2020-08-02 02:20:34 UTC - Axel Sirota: yeah i think there are some issues :slightly_smiling_face:
----
2020-08-02 02:20:39 UTC - Axel Sirota: but with create is the same…
----
2020-08-02 02:20:40 UTC - Axel Sirota: :disappointed:
----
2020-08-02 02:20:59 UTC - Axel Sirota: (added to the issue that i only have log topics and not the IDE to debug easily)
----
2020-08-02 02:21:46 UTC - Jerry Peng: Are there any errors in the function logs?
----
2020-08-02 02:24:10 UTC - Jerry Peng: Also if you don't want to figure out how to expose all the endpoints when running in docker, just exec into the container and create the function from within the container.  That should also work
----
2020-08-02 02:26:13 UTC - Axel Sirota: Yeah but for that i supposed that first it should work standalone in my mac :joy:
who knows :confused:
----
2020-08-02 02:26:17 UTC - Axel Sirota: maybe we have a bug?
----
2020-08-02 02:26:22 UTC - Axel Sirota: the function logs are clean
----
2020-08-02 02:26:27 UTC - Axel Sirota: they onyl get a 0
----
2020-08-02 02:26:31 UTC - Axel Sirota: as value, always
----
2020-08-02 02:27:49 UTC - Jerry Peng: I just tested both directly on my mac and using 2.6 docker image
----
2020-08-02 02:29:44 UTC - Axel Sirota: right, but I just downloaded the apache bin form the downloads, not docker
----
2020-08-02 02:29:51 UTC - Axel Sirota: if you want we can test with docker
----
2020-08-02 02:30:25 UTC - Axel Sirota: (btw, thank you man!!)
----
2020-08-02 02:35:22 UTC - Jerry Peng: np
----
2020-08-02 02:35:58 UTC - Jerry Peng: also if you run via docker you can expose the http admin port by starting standalone like this:

```docker run -d -p 8080:8080  apachepulsar/pulsar:2.6.0 ./bin/pulsar standalone```
----
2020-08-02 02:37:11 UTC - Axel Sirota: I will try (what is this admin port? for some rest api?)
----
2020-08-02 02:37:51 UTC - Jerry Peng: Btw function logs are under /tmp/functions for some reason now :face_with_rolling_eyes:
----
2020-08-02 02:38:08 UTC - Jerry Peng: Yes
----
2020-08-02 02:39:55 UTC - Axel Sirota: haha i will try in docker execing there and i will let you know… afaik i havent seen anything on function logs :slightly_smiling_face:
----
2020-08-02 02:49:24 UTC - Axel Sirota: this is a function that calculates an average. Logs:

```02:45:55.587 [public/default/avg-2011-0] INFO  function-avg-2011 - The number of measurements is 1
02:45:55.708 [public/default/avg-2011-0] INFO  function-avg-2011 - The average for year-2011 is : 180.197
02:46:15.195 [public/default/avg-2011-0] INFO  function-avg-2011 - The number of measurements is 2
02:46:15.218 [public/default/avg-2011-0] INFO  function-avg-2011 - Got state for key year-2011 and the old average is
02:46:15.241 [public/default/avg-2011-0] INFO  function-avg-2011 - The average for year-2011 is : 105.0985```
Check that 105 is indeed the input of that message divided 2 since it tried to fetch the state and it was null (so I had a fallback to set it as 0). :disappointed:

This is on docker with a `create` deployment
----
2020-08-02 02:49:33 UTC - Axel Sirota: In logs no error.
----
2020-08-02 02:50:27 UTC - Axel Sirota: On the end topic written i see:

```----- got message -----
key:[null], properties:[__pfn_input_msg_id__=COUBEAAgAA==, __pfn_input_topic__=<persistent://public/default/year-2011>], content:180.197
02:45:56.746 [pulsar-timer-4-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [avg-2011] [test] [3f6e6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
----- got message -----
key:[null], properties:[__pfn_input_msg_id__=COUBEAEgAA==, __pfn_input_topic__=<persistent://public/default/year-2011>], content:105.0985
02:46:56.749 [pulsar-timer-4-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [avg-2011] [test] [3f6e6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0```
So indeed the “calculation is wrong” but the “wiring” is correct
----
2020-08-02 02:52:11 UTC - Axel Sirota: I have 0 problem in doing whatever you want here so we can debug, but indeed the function is seeing the state as 0. I am not leaving out of the table that my method is wrong?

```ByteBuffer old_state = context.getState(key);
old_average = StandardCharsets.UTF_8.decode(old_state).toString();```
But isn’t this the standard Java way of fetching the string value out of the a normal bytebbuffer? unless you encode it in a weird way?
----
2020-08-02 03:22:01 UTC - Jerry Peng: Can you post the entire code?
----
2020-08-02 03:27:42 UTC - Axel Sirota: Sure thing!!
----
2020-08-02 03:27:51 UTC - Axel Sirota: <https://github.com/axel-sirota/handling-streaming-apache-pulsar/tree/module2/src/main/java/com/pluralsight/functions>
----
2020-08-02 03:42:00 UTC - Jerry Peng: I think I see the issue.  Using
```StandardCharsets.UTF_8.decode```
to decode somehow is not working.  Not sure why, will have to investigate more, but converting the ByteBuff to String e.g:

```existingVal = new String(existingState.array());```
is working
----
2020-08-02 03:47:04 UTC - Axel Sirota: ok so you propose:

```ByteBuffer old_state = context.getState(key);
average_in_str = String(old_state.array());```
Will try it!!
----
2020-08-02 03:47:33 UTC - Axel Sirota: odd I cannot decode with the UTF charset? maybe the charset is a non standard of the bookie?
----
2020-08-02 03:47:34 UTC - Jerry Peng: ```average_in_str = new String(old_state.array(), charset);```
----
2020-08-02 03:48:03 UTC - Axel Sirota: right sorry, hah coding in slack is hard :joy:
----
2020-08-02 03:48:07 UTC - Axel Sirota: will test it!!
----
2020-08-02 03:55:22 UTC - Axel Sirota: success!!!
----
2020-08-02 03:55:30 UTC - Axel Sirota: thanks man!! thiswas a long thread hahaha
----
2020-08-02 03:55:48 UTC - Axel Sirota: i will make tomorrow the pr for the docs :slightly_smiling_face: actually i have been taken notes of some other stuff as well :slightly_smiling_face:
----
2020-08-02 04:00:32 UTC - Jerry Peng: :tada:
----
2020-08-02 04:00:48 UTC - Jerry Peng: Awesome thanks for contributing to the community!
----
2020-08-02 04:09:01 UTC - Jerry Peng: I see the issue with why StandardCharsets.UTF_8.decode doesn't work.  The ByteBuf returned from state service has its position set the end of the buffer.  Thus calling hasRemaining() == false and position() will return the length of the buffer.  If I manually set the buffer position to 0, StandardCharsets.UTF_8.decode will decode properly.  @Sijie Guo any insight into why table service is doing this?  Why is the position of the ByteBuff returned set to the end?
----
2020-08-02 04:17:18 UTC - Jerry Peng: So it seems like the unintended consequence of transferring the data from a netty ByteBuffer to NIO ByteBuffer using the read() method:

<https://github.com/apache/pulsar/blob/master/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java#L83>

The NIO ByteBuffers position will be set to the end after the transfer
----