You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Cristian Constantinescu <ze...@gmail.com> on 2021/09/10 07:14:54 UTC

Re: Using Beam to generate unique ids with unbounded sources

Hi Jan and Luke,

Sorry for the late reply.

@Jan
I ended up implementing a timestamp policy like below

   private class AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy extends
TimestampPolicy<String, GenericRecord> {
        protected Instant currentWatermark;

        public
AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy(Optional<Instant>
previousWatermark) {
            currentWatermark =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        @Override
        public Instant
getTimestampForRecord(TimestampPolicy.PartitionContext ctx,
KafkaRecord<String, GenericRecord> record) {
            currentWatermark = new Instant(record.getTimestamp());
            return currentWatermark;
        }

        @Override
        public Instant getWatermark(PartitionContext ctx) {
            if (ctx.getMessageBacklog() == 0) {
                // The reader is caught up. May need to advance the
watermark.
                return BoundedWindow.TIMESTAMP_MAX_VALUE;

            } // else, there is backlog (or is unknown). Do not advance the
watermark.
            return currentWatermark;
        }
    }

This seems to work most of the time, but sometimes when using exactly once
semantics with KafkaIO and Flink ctx.getMessageBacklog() is always > 0, so
the watermark is not advanced to BoundedWindow.TIMESTAMP_MAX_VALUE.

I'll have to find a reliable way to reproduce that though and make a sample
project.

@Luke
That could be a solution, however I prefer to keep state into kafka instead
of hashing because there are cases where the id of (A1,B1,C1) is the same
as the id for (A2, B1, C1) because A1 became A2 after some time (like a
person legally changed their name, or a financial security changed its
symbol or cusip, etc).

Thank you both for your suggestions and guidance.

Cheers,
Cristian

On Mon, Aug 9, 2021 at 6:32 PM Luke Cwik <lc...@google.com> wrote:

> You could look at using a cryptographic hashing function such as sha512
> [1].
>
> You would take the record (A1, B1, C1) encode it and pass it to the
> hashing function to generate a binary hash which you could then convert
> back to string via an encoding such as hex. The odds of getting a
> collision are astronomically small (like you are more likely to have a
> random bit flip due to a cosmic ray than for a collision to happen).
>
> This way you would never need to restore the ids from a previous run by
> looking them up from an external source.
>
> 1:
> https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java
>
> On Thu, Jul 22, 2021 at 7:40 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Cristian,
>>
>> I didn't try that, so I'm not 100% sure it would work, but you probably
>> could try using custom timestamp policy for the KafkaIO, which will
>> shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once you know
>> you reached head of the state topic. That would probably require reading
>> the end offsets before running the Pipeline. This should turn the source
>> into bounded source effectively.
>>
>>   Jan
>>
>> [1]
>>
>> https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
>>
>> On 7/22/21 2:14 PM, Cristian Constantinescu wrote:
>> > Hi All,
>> >
>> > I would like to know if there's a suggested pattern for the below
>> > scenario. TL;DR: reading state from Kafka.
>> >
>> > I have a scenario where I'm listening to a kafka topic and generate a
>> > unique id based on the properties of the incoming item. Then, I output
>> > the result to another kafka topic. The tricky part is that when the
>> > pipeline is restarted, I have to read the output topic and build up
>> > the ids state, this way if I see an item that was already given an id,
>> > I give the same id back and do not generate a new one.
>> >
>> > For example:
>> > Input topic -> Output topic
>> > (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
>> > (A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
>> > pipeline is restarted
>> > (A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
>> > (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1") <-- because we've
>> > already seen (A1, B1, C1) before
>> >
>> > I can't really use any type of windows except the global ones, as I
>> > need to join on all the items of the output topic (the one with the
>> > already generated ids).
>> >
>> > Right now, I flatten both input and output topics and I use a trigger
>> > on the global window
>> >
>> AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10)
>>
>> > then group by properties (A,B,C). Once that is done, I look through
>> > the grouped rows and see if any one of them has an id already
>> > generated. If yes, all the other rows get this id and the id is saved
>> > in the ParDo's state for the future messages. If no, then generate a
>> > new id.
>> >
>> > My solution seems to work. Kind of...
>> >
>> > This puts a delay of 10s on all the incoming messages. I'd prefer it
>> > wouldn't be the case. I would like to read the output topic at the
>> > start of the pipeline, build the state, then start processing the
>> > input topic. Since the output topic will be stale until I start
>> > processing the input topic again, it effectively is a
>> > bounded collection. Unfortunately because it's kafkaIO, it's still
>> > considered an unbounded source, which mainly means that Wait.on() this
>> > collection waits forever. (Note: I've read the notes in the
>> > documentation [1] but either do not understand them or didn't take the
>> > appropriate steps for wait.on to trigger properly.)
>> >
>> > I have also tried to window the output topic in a session window with
>> > a one second gap. Basically, if I don't get any item for 1 second, it
>> > means that I finished reading the output topic and can start
>> > processing the input topic. Unfortunately Wait.on() doesn't work for
>> > Session Windows.
>> >
>> > Furthermore, I don't think side inputs work for this problem. First
>> > because I'm not sure how to create the side input from an unbounded
>> > source. Second because the side input needs to be updated when a new
>> > id is generated.
>> >
>> > I would appreciate any thoughts or ideas to elegantly solve this
>> problem.
>> >
>> > Thanks,
>> > Cristian
>> >
>> > [1]
>> >
>> https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
>> > <
>> https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
>> >
>>
>

Re: Using Beam to generate unique ids with unbounded sources

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Cristian,

I think that the backlog is not going to be 100% reliable for this 
use-case. A more robust approach would probably be to fetch the 
endOffsets from Kafka when submitting the job (or in appropriate time, 
depending on how do you get updates to the state topic, to make sure 
that you cannot miss something) and then compare these with offset in 
KafkaRecord in getTimestampForRecord. After all partitions reach the end 
offset, that should be the point when you can advance the watermark to 
infinity.

  Jan

On 9/10/21 9:14 AM, Cristian Constantinescu wrote:
> Hi Jan and Luke,
>
> Sorry for the late reply.
>
> @Jan
> I ended up implementing a timestamp policy like below
>
>    private class AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy 
> extends TimestampPolicy<String, GenericRecord> {
>         protected Instant currentWatermark;
>
>         public 
> AdvanceWatermarkToInfinityAtEndOfTopicTimePolicy(Optional<Instant> 
> previousWatermark) {
>             currentWatermark = 
> previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
>         }
>
>         @Override
>         public Instant 
> getTimestampForRecord(TimestampPolicy.PartitionContext ctx, 
> KafkaRecord<String, GenericRecord> record) {
>             currentWatermark = new Instant(record.getTimestamp());
>             return currentWatermark;
>         }
>
>         @Override
>         public Instant getWatermark(PartitionContext ctx) {
>             if (ctx.getMessageBacklog() == 0) {
>                 // The reader is caught up. May need to advance the 
> watermark.
>                 return BoundedWindow.TIMESTAMP_MAX_VALUE;
>
>             } // else, there is backlog (or is unknown). Do not 
> advance the watermark.
>             return currentWatermark;
>         }
>     }
>
> This seems to work most of the time, but sometimes when using exactly 
> once semantics with KafkaIO and Flink ctx.getMessageBacklog() is 
> always > 0, so the watermark is not advanced to 
> BoundedWindow.TIMESTAMP_MAX_VALUE.
>
> I'll have to find a reliable way to reproduce that though and make a 
> sample project.
>
> @Luke
> That could be a solution, however I prefer to keep state into kafka 
> instead of hashing because there are cases where the id of (A1,B1,C1) 
> is the same as the id for (A2, B1, C1) because A1 became A2 after some 
> time (like a person legally changed their name, or a financial 
> security changed its symbol or cusip, etc).
>
> Thank you both for your suggestions and guidance.
>
> Cheers,
> Cristian
>
> On Mon, Aug 9, 2021 at 6:32 PM Luke Cwik <lcwik@google.com 
> <ma...@google.com>> wrote:
>
>     You could look at using a cryptographic hashing function such as
>     sha512 [1].
>
>     You would take the record (A1, B1, C1) encode it and pass it to
>     the hashing function to generate a binary hash which you could
>     then convert back to string via an encoding such as hex. The odds
>     of getting a collision are astronomically small (like you are more
>     likely to have a random bit flip due to a cosmic ray than for a
>     collision to happen).
>
>     This way you would never need to restore the ids from a previous
>     run by looking them up from an external source.
>
>     1:
>     https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java
>     <https://stackoverflow.com/questions/33085493/how-to-hash-a-password-with-sha-512-in-java>
>
>     On Thu, Jul 22, 2021 at 7:40 AM Jan Lukavský <je.ik@seznam.cz
>     <ma...@seznam.cz>> wrote:
>
>         Hi Cristian,
>
>         I didn't try that, so I'm not 100% sure it would work, but you
>         probably
>         could try using custom timestamp policy for the KafkaIO, which
>         will
>         shift the timestamp to BoundedWindow.TIMESTAMP_MAX_VALUE, once
>         you know
>         you reached head of the state topic. That would probably
>         require reading
>         the end offsets before running the Pipeline. This should turn
>         the source
>         into bounded source effectively.
>
>           Jan
>
>         [1]
>         https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
>         <https://beam.apache.org/releases/javadoc/2.31.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
>
>         On 7/22/21 2:14 PM, Cristian Constantinescu wrote:
>         > Hi All,
>         >
>         > I would like to know if there's a suggested pattern for the
>         below
>         > scenario. TL;DR: reading state from Kafka.
>         >
>         > I have a scenario where I'm listening to a kafka topic and
>         generate a
>         > unique id based on the properties of the incoming item.
>         Then, I output
>         > the result to another kafka topic. The tricky part is that
>         when the
>         > pipeline is restarted, I have to read the output topic and
>         build up
>         > the ids state, this way if I see an item that was already
>         given an id,
>         > I give the same id back and do not generate a new one.
>         >
>         > For example:
>         > Input topic -> Output topic
>         > (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1")
>         > (A1, B1, C2) -> (A1, B1, C2, Random string "ID 2")
>         > pipeline is restarted
>         > (A3, B3, C3) -> (A3, B3, C3, Random string "ID 3")
>         > (A1, B1, C1) -> (A1, B1, C1, Random string "ID 1") <--
>         because we've
>         > already seen (A1, B1, C1) before
>         >
>         > I can't really use any type of windows except the global
>         ones, as I
>         > need to join on all the items of the output topic (the one
>         with the
>         > already generated ids).
>         >
>         > Right now, I flatten both input and output topics and I use
>         a trigger
>         > on the global window
>         >
>         AfterProcessingTime.pastFirstElementInPane().plusDuration(Duration.standardSeconds(10)
>
>         > then group by properties (A,B,C). Once that is done, I look
>         through
>         > the grouped rows and see if any one of them has an id already
>         > generated. If yes, all the other rows get this id and the id
>         is saved
>         > in the ParDo's state for the future messages. If no, then
>         generate a
>         > new id.
>         >
>         > My solution seems to work. Kind of...
>         >
>         > This puts a delay of 10s on all the incoming messages. I'd
>         prefer it
>         > wouldn't be the case. I would like to read the output topic
>         at the
>         > start of the pipeline, build the state, then start
>         processing the
>         > input topic. Since the output topic will be stale until I start
>         > processing the input topic again, it effectively is a
>         > bounded collection. Unfortunately because it's kafkaIO, it's
>         still
>         > considered an unbounded source, which mainly means that
>         Wait.on() this
>         > collection waits forever. (Note: I've read the notes in the
>         > documentation [1] but either do not understand them or
>         didn't take the
>         > appropriate steps for wait.on to trigger properly.)
>         >
>         > I have also tried to window the output topic in a session
>         window with
>         > a one second gap. Basically, if I don't get any item for 1
>         second, it
>         > means that I finished reading the output topic and can start
>         > processing the input topic. Unfortunately Wait.on() doesn't
>         work for
>         > Session Windows.
>         >
>         > Furthermore, I don't think side inputs work for this
>         problem. First
>         > because I'm not sure how to create the side input from an
>         unbounded
>         > source. Second because the side input needs to be updated
>         when a new
>         > id is generated.
>         >
>         > I would appreciate any thoughts or ideas to elegantly solve
>         this problem.
>         >
>         > Thanks,
>         > Cristian
>         >
>         > [1]
>         >
>         https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
>         <https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>
>
>         >
>         <https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html
>         <https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/Wait.html>>
>