You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2018/07/17 09:11:02 UTC

Slack digest for #general - 2018-07-17

2018-07-16 11:26:00 UTC - Tomer Lev: Hi, I'm trying to purge one of my topics using pulsar-admin: `sudo ./bin/pulsar-admin topics clear-backlog -s "subscr-name" "<persistent://pulsar-pulsar-cluster/public/default/QueueName>` but I got: ```Failed to perform http post request: javax.ws.rs.NotFoundException: HTTP 404 Not Found
HTTP 404 Not Found

Reason: Tenant does not exist```

In addition when trying to fetch tenants i get: ```curl <http://localhost:8080/admin/tenants>
&lt;html&gt;
&lt;head&gt;
&lt;meta http-equiv="Content-Type" content="text/html;charset=ISO-8859-1"/&gt;
&lt;title&gt;Error 404 &lt;/title&gt;
&lt;/head&gt;
&lt;body&gt;
&lt;h2&gt;HTTP ERROR: 404&lt;/h2&gt;
&lt;p&gt;Problem accessing /admin/tenants. Reason:
&lt;pre&gt;    Not Found&lt;/pre&gt;&lt;/p&gt;
&lt;hr /&gt;&lt;a href="http://eclipse.org/jetty"&gt;Powered by Jetty:// 9.3.11.v20160721&lt;/a&gt;&lt;hr/&gt;
&lt;/body&gt;
&lt;/html&gt;```
----
2018-07-16 13:17:39 UTC - Idan: @Tomer Lev make sure subscr-name is the right name:)
----
2018-07-16 13:19:12 UTC - Tomer Lev: it is the right name
----
2018-07-16 13:20:29 UTC - Idan: perhaps public/default isnt the exact tenant/namespace ?
----
2018-07-16 13:22:02 UTC - Idan: or the order is mybe matter? ./pulsar-admin topics clear-backlog &lt;queue-name&gt; -s &lt;subscribe-address&gt;
----
2018-07-16 13:44:43 UTC - Karthik Palanivelu: Team, If I apply deduplication on a namespace, is it applicable for a broker or for a cluster of brokers within a region with same cluster name? I see duplicate messages sent over.
----
2018-07-16 14:38:41 UTC - Daniel Ferreira Jorge: How can I read a specific message property using the python client? (`msg.properties()`)
----
2018-07-16 14:56:01 UTC - Sijie Guo: @Idan your topic name is not correct. It should be <persistent://public/default/&lt;topic-name>&gt;
----
2018-07-16 14:57:43 UTC - Sijie Guo: @Karthik Palanivelu it is applicable for all brokers. Did you set the sequence id for messages when you produce the messages? Deduplication uses the sequence id to dedup.
----
2018-07-16 15:02:48 UTC - Sijie Guo: @Daniel Ferreira Jorge I am not sure if there is a method for reading a specific property. You can probably get properties and then get the specific property from the properties 
----
2018-07-16 15:05:09 UTC - Daniel Ferreira Jorge: @Sijie Guo ok, do you know how can I get it from the `msg.properties()`?
----
2018-07-16 15:06:34 UTC - Sijie Guo: I think it returns a python map for properties, no? 
----
2018-07-16 15:10:11 UTC - Daniel Ferreira Jorge: actually it does... I tried getting an inexistent key... OMG :face_with_rolling_eyes:
----
2018-07-16 15:10:25 UTC - Daniel Ferreira Jorge: thanks @Sijie Guo
----
2018-07-16 15:22:29 UTC - Karthik Palanivelu: @Sijie Guo I did not set the sequence id. Doc mentions me to set the producer name and timeout to zero. Can you please let me know on how to do it?
----
2018-07-16 15:26:39 UTC - Sijie Guo: <https://streaml.io/blog/pulsar-effectively-once/>
----
2018-07-16 15:26:52 UTC - Sijie Guo: Matteo wrote a blog post
----
2018-07-16 15:26:58 UTC - Sijie Guo: It has an example 
----
2018-07-16 15:45:31 UTC - Matteo Merli: Yes you don't necessarily need to explicitely set the sequence ids. How are you checking that you have received duplicates?
----
2018-07-16 16:25:36 UTC - Karthik Palanivelu: @Matteo Merli I publish the same messages from the producer say "messages - i" where i is 0..9 in three iterations. On the consumer side, I get the same messages/count.
----
2018-07-16 17:03:09 UTC - Matteo Merli: That is correct, the deduplication is based on the messages sequence ids.  If you publish different messages with same payloads they would not be deduplicated.  The dedup is applied for internal retransmission due to failures or if you explicitly set the sequence id, in order to hint the system that your message was logically the same at the one before. Otherwise, if you don't provide sequence I'd, the client will attach a sequential id for each message that you attempt to publish. 
----
2018-07-16 17:43:22 UTC - Karthik Palanivelu: Got it, My understanding was that we cannot publish same message again and again like SQS deduplication. Do you have this feature in your roadmap? Can we update the documentation to reflect this scenario? Just a feedback from user.
----
2018-07-16 17:46:08 UTC - Grant Wu: ~<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html> seems to indicate that they use a similar ID based approach…~
----
2018-07-16 17:49:11 UTC - Grant Wu: Oh, never mind, there is content-based deduplication
----
2018-07-16 17:54:48 UTC - Matteo Merli: &gt; any messages sent with the same message deduplication ID are accepted successfully but aren’t delivered during the 5-minute deduplication interval.

The problem with doing deduplication based on content, is that is has to be a “best-effort” based on a small time-window. Doing it in a exact form, would require to keep infinite state to keep track of contents. In this case, the dedup is better done in a downstream DB system where you can have a “unique” index on some message property.

For the previous example, in which you where publishing messages multiple times, that could be handled like: 

```
for (int i = 0; i &lt; 3;  i++ ) {
    for (int j = 0; j &lt; 10;  j++ ) {
        producer.newMessage().value("hello-" + j).sequenceId(j).send();
     }
}
```

By setting the `sequenceId` explicitely, the dedup will work in this case.
----
2018-07-16 17:56:39 UTC - Grant Wu: You could possibly hack it based on Pulsar functions, although I have no experience in this area and no idea how that performs
----
2018-07-16 18:00:38 UTC - Karthik Palanivelu: Thats a cool hack @Matteo Merli. I was thinking of persisting the message along with hash in Bookie to validate before we persist.
----
2018-07-16 18:36:31 UTC - Karthik Palanivelu: This works @Matteo Merli.
----
2018-07-16 18:37:04 UTC - Karthik Palanivelu: Will you be considering the Content Based Deduplication feature for future roadmap?
----
2018-07-16 18:59:07 UTC - Daniel Ferreira Jorge: When I create a namespace (`@PUT @Path("/{tenant}/{namespace}")`) with the Admin REST API, it creates (code 204) but when I try to "use" it, I get code 412 `Namespace does not have any clusters configured : local_cluster=standalone ns=tenant/namespace`... When I use the java admin client, I create the namespace using this signature `void createNamespace(String var1, Set&lt;String&gt; var2)` with the `var2` as the cluster and it works. How can I specify the cluster using the REST API when creating a namespace?
----
2018-07-16 19:26:18 UTC - Grant Wu: Uh, is it just me, or is the fact that you need to put a `v2` in the URL for Pulsar not mentioned in the Websocket API docs?
----
2018-07-16 19:27:11 UTC - Grant Wu: e.g. `<ws://broker-service-url:8080/ws/consumer/persistent/:tenant/:namespace/:topic/:subscription>`
----
2018-07-16 19:27:13 UTC - Daniel Ferreira Jorge: I use the v2
----
2018-07-16 19:27:15 UTC - Grant Wu: doesn’t mention “v2” anywhere
----
2018-07-16 19:27:21 UTC - Matteo Merli: @Daniel Ferreira Jorge When create the namespace through REST API, you can pass a “policies” object with the initial configuration for the namespace, which can be later changed with subsequent REST calls. 

This is where the POJO is defined: <https://github.com/apache/incubator-pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java#L31> 

When creating the namespace, you can just pass the fields you want to initialize, eg: 

```
curl -XPUT -H'Content-Type: application/json' -d '{"replication_clusters":["standalone"]}' <http://localhost:8080/admin/v2/namespaces/public/my-test>
```
----
2018-07-16 19:27:22 UTC - Grant Wu: afaict
----
2018-07-16 19:27:25 UTC - Daniel Ferreira Jorge: I forgot to mention
----
2018-07-16 19:27:34 UTC - Grant Wu: sorry @Daniel Ferreira Jorge this is completely unrelated to your question.
----
2018-07-16 19:27:41 UTC - Grant Wu: I’m just asking in general
----
2018-07-16 19:28:13 UTC - Daniel Ferreira Jorge: @Grant Wu Sorry too! I misunderstood!
----
2018-07-16 19:28:54 UTC - Daniel Ferreira Jorge: @Matteo Merli Thank you! Just what I needed. It worked.
----
2018-07-16 19:29:36 UTC - Matteo Merli: &gt; Uh, is it just me, or is the fact that you need to put a `v2` in the URL for Pulsar not mentioned in the Websocket API docs? 

It’s possible some of the docs might have not been updated :confused:
----
2018-07-16 19:29:58 UTC - Grant Wu: :anger:
----
2018-07-16 19:35:49 UTC - Grant Wu: That’s pretty bad.
----
2018-07-16 19:36:33 UTC - Grant Wu: I notice the Admin API also needs a v2?
----
2018-07-16 19:38:35 UTC - Grant Wu: I started a branch to work on this.
+1 : Matteo Merli
----
2018-07-16 19:39:03 UTC - Grant Wu: Is everything versioned under v2?
----
2018-07-16 19:43:09 UTC - Matteo Merli: The v2 REST API was introduced with Pulsar 2.0. Older API handler are still available for compatibility 
----
2018-07-16 19:43:18 UTC - Daniel Ferreira Jorge: Is it possible that `message_listener` is not working with the python client? I can only consume with the "regular" `while True: consumer.receive()"`... If I do a `client.subscribe('topic', 'sub', message_listener=listener)` it does not work...
----
2018-07-16 19:45:10 UTC - Matteo Merli: When using listener, you're not supposed to call `receive()` anymore. The messages will be passed to the listener function only
----
2018-07-16 19:45:38 UTC - Grant Wu: right, but… it’s not clear that to use the v2 REST API, you need a `/v2/`
----
2018-07-16 19:45:40 UTC - Daniel Ferreira Jorge: I comment the code out before using the listener
----
2018-07-16 19:45:53 UTC - Daniel Ferreira Jorge: the messages are not being passed to the listener
----
2018-07-16 19:48:54 UTC - Matteo Merli: We have a unit/integration test for py messsge listener: <https://github.com/apache/incubator-pulsar/blob/master/pulsar-client-cpp/python/pulsar_test.py>
----
2018-07-16 19:49:08 UTC - Matteo Merli: Line 155
----
2018-07-16 19:49:08 UTC - Daniel Ferreira Jorge: Yes, I already saw that...
----
2018-07-16 19:49:27 UTC - Daniel Ferreira Jorge: this is exactly as I did
----
2018-07-16 19:50:55 UTC - Daniel Ferreira Jorge: the only difference is that I'm subscribing without specifying the cluster
----
2018-07-16 19:51:38 UTC - Matteo Merli: Yes, that doesn't matter, from client perspective is just a string
----
2018-07-16 19:54:01 UTC - Daniel Ferreira Jorge: with the EXACT same code, if I use a listener instead `consumer.receive()`, it does not work. My listener is defined as `def listener(consumer, msg): print("Got message: %s" % msg)`
----
2018-07-16 20:57:08 UTC - Daniel Ferreira Jorge: @Matteo Merli I figured out... when using the listener, after the `client.subscribe` I had to put a `while True: sleep(1)` for the messages to be consumed...
----
2018-07-16 20:57:20 UTC - Daniel Ferreira Jorge: not sure why
sweat_smile : Grant Wu
----
2018-07-16 20:58:19 UTC - Matteo Merli: ok, after subscribe calls return message will start flow into listener, but the thread is then exiting
----
2018-07-16 21:01:08 UTC - Daniel Ferreira Jorge: Let me ask one more thing. The multi topic subscription for the python client will be available in 2.1.0 or 2.2.0?
----
2018-07-16 21:01:27 UTC - Matteo Merli: 2.2
----
2018-07-16 21:01:55 UTC - Matteo Merli: we plan to fill most of the c++ feature gaps in 2.2
----
2018-07-16 21:01:56 UTC - Daniel Ferreira Jorge: ok
----
2018-07-16 21:02:05 UTC - Daniel Ferreira Jorge: great
----
2018-07-16 21:02:18 UTC - Daniel Ferreira Jorge: thx
----