You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@pulsar.apache.org by Apache Pulsar Slack <ap...@gmail.com> on 2018/06/10 09:11:02 UTC

Slack digest for #general - 2018-06-10

2018-06-09 11:23:47 UTC - Idan: Morning guys. I was trying to build our consumers dedup logic on top of pulsar’s message sequenceId. we use redis for that. but i just found out that evreytime i reconnect the producer client the sequenceId is being restarted(starting from 0) so using key: topicName + “:” + sequenceId
----
2018-06-09 11:23:49 UTC - Idan: isnt good
----
2018-06-09 11:32:30 UTC - Idan: i need suggesation how can I ensure dedups on consumer side. we do have our own unique eventId but thats not enough as dedup-key because if we have redelivery we do want to let same eventId’s to pass our dedups(as they are re-delivered and they should not considered as dedup). so we need to find a way how to distinct (on the consumer side) when message was firstTime receive and when it’s re-delivered
----
2018-06-09 17:36:29 UTC - Sijie Guo: Does using message id work for you? @Idan 
----
2018-06-09 17:42:21 UTC - Idan: Iam not sure i think when re-delivery happens you get the same messageID. Ill check
----
2018-06-09 17:44:23 UTC - Sijie Guo: Yes the message id will not change 
----
2018-06-09 17:45:26 UTC - Sijie Guo: The sequence id is scoped to publishers, the message id is globally unique
----
2018-06-09 19:19:52 UTC - Idan: so how ill know if it was re-delivered ?
----
2018-06-09 19:20:07 UTC - Idan: if messageID will be the same for re-deliveries it will fall for dedup
----
2018-06-09 19:25:42 UTC - Sijie Guo: Currently there is no way to tell if it is a reddlivered message. MessageID will be unchanged for a message, which can be used for unique id for deduplication 
----
2018-06-10 05:19:03 UTC - Idan: but @Sijie Guo if i cant know if message is reddlivered how can I apply any dedup mechanisem (or DLQ mechanisem) as we need our dedup not to be applied to reddlivered messages. as by nature they are reddlivered and need to go on with process
----
2018-06-10 05:25:05 UTC - Sijie Guo: I am not sure we are talking the same thing here. when checking if a message is duplicated or not, you need a unique id (e.g. message id or offset) to identify if this message is redelivered or not. do I miss anything?
----
2018-06-10 05:38:55 UTC - Idan: ok ill try to rephrase my question
----
2018-06-10 05:40:22 UTC - Idan: we are trying to implement dedup mechanisem on the consumer side to ensure exactly-once. the dedup should block messages that being consumed twice but in other hand if message is re-tried (because it never acked) it should pass our dedup process. I need to understand how can I leverage Pulsar messaging system for that purpose
----
2018-06-10 05:45:09 UTC - Sijie Guo: so a *Message* in pulsar has a unique message id, which you can be used for deduplication. 

so the part I don’t understand is why do you need to distinguish “if message is re-tried (because it never acked) it should pass our dedup process”. if a message is never acked, the message will be not recorded in your “dedup” system when it is redelivered, it will not be deduplicated, right?
----
2018-06-10 05:58:11 UTC - Idan: Ahh i get the confusion now
----
2018-06-10 05:58:37 UTC - Idan: We ack messages only after we finish all our processing.
----
2018-06-10 05:59:08 UTC - Idan: Meaning when message arrives it first hits the dedup mechanism then processing if everything goes nice we ack
----
2018-06-10 06:00:57 UTC - Idan: This flow pushing all the retry logic  to the messaging framework where application only ack's when flow success at one point.(on the end of the flow) and if.something bad happens in the middle (before.ack) a redelivery will be triggered
----
2018-06-10 06:01:16 UTC - Idan: And the flow will start all over again(retry)
----
2018-06-10 06:05:52 UTC - Sijie Guo: in your case, what did you do when it hits the dedup mechanism?
----
2018-06-10 06:06:01 UTC - Sijie Guo: are your recording this message id or?
----
2018-06-10 06:08:17 UTC - Idan: Heh iam writing the dedup mechanisem as we talk.. thats why iam asking you for a hint. How can i tell if its retry or first time. In other queue systems like SQS its fairly easy. As they(aws) populate the.message with retryCounter property helping our application to determine wither its firstTime arrival or reTry(and if a retry how.many times it was retried already)
----
2018-06-10 06:16:24 UTC - Sijie Guo: in the earlier conversation you had with Matteo, he pointed out that we don’t have such retry counter. so when a message is redelivered, the message has exactly same information as before. so you don’t have any retry information to tell if it is retry or first time. 

but my point is why do you need this retry information for deduplication? isn’t deduplication checking whether the message has been processed or not? 

there are a couple of ways to do:

1) if you are using failover/exclusive, you are consuming message in order, you just need to keep last process message id, so can compare the message id to know whether it is duplicated or not.

2) if you can use failover/exclusive, you are using shared. you can use a key/value cache/store, you can put  a key/value &lt;messageid, State.Processing&gt; in a key/value cache that means you are processing message identified &lt;messageid&gt;, when you are done processing changed the key/value pair to &lt;messageid, State.Processed&gt;. when you received a redelivered message, you know if there are other processes processing the same message.
----
2018-06-10 06:19:09 UTC - Idan: but my point is why do you need this retry information for deduplication? isn’t deduplication checking whether the message has been processed or not?         because dedup check if message was consumed.. not processed
----
2018-06-10 06:19:17 UTC - Idan: i mean we use dedup just to ensure exactly-once
----
2018-06-10 06:19:31 UTC - Idan: but if message passed our dedup it wont get ack coz we wanna make sure the flow was submitted correctly.then we ack
----
2018-06-10 06:20:21 UTC - Idan: we leverage the ‘ack’ mechanisem so when we consume message succesfully it’s not enough for us to ack it. we consume it process it then ack
----
2018-06-10 06:20:57 UTC - Sijie Guo: &gt; dedup check if message was consumed.. not processed

if dedup is checking a message was consumed not processed, does it mean you allow processing multiple times?
----
2018-06-10 06:21:29 UTC - Sijie Guo: what is difference between “consumed” and “processed”?
----
2018-06-10 06:22:18 UTC - Idan: consumed we got the message from pulsar
----
2018-06-10 06:22:38 UTC - Idan: processed it went successfully in our flow (e.g all busniess logic applied to it, added into datasources, etc..)
----
2018-06-10 06:24:00 UTC - Sijie Guo: do you allow processing multiple times?
----
2018-06-10 06:28:39 UTC - Sijie Guo: &gt; i mean we use dedup just to ensure exactly-once

I am assuming “exactly-once”, you want to process an message exactly-once. so you have to put the message into some de-dup store/cache before your processing it, otherwise pulsar might be redelivering the message if you don’t ack. 

if that’s the case, if a message is being processed, you still don’t want a redelivered message going through your processing flow, no?

I think that’s the confusion I have here.
----
2018-06-10 06:37:24 UTC - Idan: if that’s the case, if a message is being processed, you still don’t want a redelivered message going through your processing flow, no? - true. thats why ill give the ack-timeout enough grace
----
2018-06-10 06:37:41 UTC - Idan: to let the first message copy to go thru
----
2018-06-10 06:41:30 UTC - Sijie Guo: if that’s case, when you received a message, update the map with &lt;messageid, State.processing&gt; and process, after processed, update the map with &lt;messageid, State.processed&gt;. that would address your case, no?
----
2018-06-10 06:46:45 UTC - Idan: yes but mybe i can replace State.processin with counter
----
2018-06-10 06:47:47 UTC - Sijie Guo: yes
----
2018-06-10 06:48:21 UTC - Idan: and if the key is there i can just inc it
----
2018-06-10 06:48:32 UTC - Idan: so I can even know how many times
----
2018-06-10 06:48:46 UTC - Idan: the only think i afraid is how the performances will be influenced by that
----
2018-06-10 06:52:17 UTC - Sijie Guo: what is your original de-dup logic with SQS? I don’t think there will be difference, right?
----
2018-06-10 06:54:18 UTC - Idan: in sqs it was easier as I said
----
2018-06-10 06:54:26 UTC - Idan: i didnt need to maintain retry-counters on redis
----
2018-06-10 06:54:30 UTC - Idan: as they were arrived on the message
----
2018-06-10 06:54:55 UTC - Sijie Guo: do you still main a map, right?
----
2018-06-10 06:54:58 UTC - Idan: yes
----
2018-06-10 06:55:08 UTC - Idan: almost the same. you right
----
2018-06-10 06:55:14 UTC - Idan: also with SQS i had DLQ automaticlly
----
2018-06-10 06:55:23 UTC - Idan: here ill need to count retries on my memory map
----
2018-06-10 06:55:31 UTC - Sijie Guo: yeah. that’s true
----
2018-06-10 06:55:31 UTC - Idan: and dispatch to DLQ after X retries
----
2018-06-10 06:56:56 UTC - Sijie Guo: yeah. we might consider supporting DLQ naturally in pulsar, that can probably simply your logic.
----
2018-06-10 06:58:28 UTC - Idan: yes ofcourse.. but till you get there iam planning to get this to prod within 1 month
----
2018-06-10 06:58:43 UTC - Idan: so if atleast I could get the retry counter on the broker side it will ease our pain
----
2018-06-10 06:59:21 UTC - Sijie Guo: sure
----
2018-06-10 06:59:47 UTC - Sijie Guo: hope we are now clear about the message id and dedup part :slightly_smiling_face:
----
2018-06-10 07:59:35 UTC - Idan: yes. you do! thank you
----