You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Wei Hsia ☁ <we...@google.com> on 2022/02/01 23:10:10 UTC

[BEAM-13298] Question Regarding Exactly Once Sink

Hi All,

For BEAM-13298 <https://issues.apache.org/jira/browse/BEAM-13298>, I've
been trying to alter KafkaIO to output a PCollection.

The idea is to output the input object but technically, EOS will window the
object
<https://github.com/weifonghsia/beam/blob/KafkaIOWithOutput/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L171>which
makes it not the original object?

Would it be more correct to simply output the input as is? or should I be
going into EOS to return the post-windowed object?

Thanks,

Wei


Wei Hsia

Developer Advocate

Google Cloud

weihsia@google.com

949.794.2004

Re: [BEAM-13298] Question Regarding Exactly Once Sink

Posted by Wei Hsia ☁ <we...@google.com>.
Hi again,

I have a quick question, i don't think my understanding of the window
objects is deep enough to answer this.
I had some help from Reza and here's what I have so far.

In KafkaExactlyOnceSink:

Get the WindowingStrategy of the original input

WindowingStrategy<?, ?> originalWindowStrategy = input.getWindowingStrategy();

return input
    .apply(
        Window.<ProducerRecord<K, V>>into(new GlobalWindows()) //
Everything into global window.
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
            .discardingFiredPanes())
    .apply(
        String.format("Shuffle across %d shards", numShards),
        ParDo.of(new Reshard<>(numShards)))
    .apply("Persist sharding", GroupByKey.create())
    .apply("Assign sequential ids", ParDo.of(new Sequencer<>()))
    .apply("Persist ids", GroupByKey.create())
    .apply(
        String.format("Write to Kafka topic '%s'", spec.getTopic()),
        ParDo.of(new ExactlyOnceWriter<>(spec, input.getCoder())))
    .apply(Flatten.iterables())
    .apply("Extract Timestamp", Reify.extractTimestampsFromValues())
    .apply("Extract ProducerRecord",
        ParDo.of(
            new DoFn<KV<Long, ProducerRecord<K, V>>, ProducerRecord<K, V>>(){
              @ProcessElement
              public void processElement(ProcessContext c) {
                c.output(c.element().getValue());
              }}))
    .apply("Reapply Window",
Window.into(originalWindowStrategy.withWindowFn()));

So I return the input Iterable from ExactlyOnceWriter then I flatten it
down.
Use Reify to reattach the timestamp, then pull out the ProducerRecord
(There's probably a helper function for this to make the code cleaner).
Then at last reapply the window; however, I'm not sure what the object
should be going into the .withWindowFn( OBJECT )?

Apologies if that doesn't make a whole heck of sense.

Thanks,

Wei


Wei Hsia

Developer Advocate

Google Cloud

weihsia@google.com

949.794.2004


On Tue, Feb 1, 2022 at 3:28 PM Reuven Lax <re...@google.com> wrote:

> Correct, it should have the original windowing. You can do this by
> capturing the original WindowFn and rewindowing the output with that
> WindowFn.
>
> On Tue, Feb 1, 2022 at 3:16 PM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> I think if we output the original data, it should have the original
>> windowing. The thing that is waited on to complete the write may have
>> alternative windowing though.
>>
>> On Tue, Feb 1, 2022 at 3:10 PM Wei Hsia ☁ <we...@google.com> wrote:
>>
>>> Hi All,
>>>
>>> For BEAM-13298 <https://issues.apache.org/jira/browse/BEAM-13298>, I've
>>> been trying to alter KafkaIO to output a PCollection.
>>>
>>> The idea is to output the input object but technically, EOS will window
>>> the object
>>> <https://github.com/weifonghsia/beam/blob/KafkaIOWithOutput/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L171>which
>>> makes it not the original object?
>>>
>>> Would it be more correct to simply output the input as is? or should I
>>> be going into EOS to return the post-windowed object?
>>>
>>> Thanks,
>>>
>>> Wei
>>>
>>>
>>> Wei Hsia
>>>
>>> Developer Advocate
>>>
>>> Google Cloud
>>>
>>> weihsia@google.com
>>>
>>> 949.794.2004 <(949)%20794-2004>
>>>
>>

Re: [BEAM-13298] Question Regarding Exactly Once Sink

Posted by Reuven Lax <re...@google.com>.
Correct, it should have the original windowing. You can do this by
capturing the original WindowFn and rewindowing the output with that
WindowFn.

On Tue, Feb 1, 2022 at 3:16 PM Robert Bradshaw <ro...@google.com> wrote:

> I think if we output the original data, it should have the original
> windowing. The thing that is waited on to complete the write may have
> alternative windowing though.
>
> On Tue, Feb 1, 2022 at 3:10 PM Wei Hsia ☁ <we...@google.com> wrote:
>
>> Hi All,
>>
>> For BEAM-13298 <https://issues.apache.org/jira/browse/BEAM-13298>, I've
>> been trying to alter KafkaIO to output a PCollection.
>>
>> The idea is to output the input object but technically, EOS will window
>> the object
>> <https://github.com/weifonghsia/beam/blob/KafkaIOWithOutput/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L171>which
>> makes it not the original object?
>>
>> Would it be more correct to simply output the input as is? or should I be
>> going into EOS to return the post-windowed object?
>>
>> Thanks,
>>
>> Wei
>>
>>
>> Wei Hsia
>>
>> Developer Advocate
>>
>> Google Cloud
>>
>> weihsia@google.com
>>
>> 949.794.2004 <(949)%20794-2004>
>>
>

Re: [BEAM-13298] Question Regarding Exactly Once Sink

Posted by Robert Bradshaw <ro...@google.com>.
I think if we output the original data, it should have the original
windowing. The thing that is waited on to complete the write may have
alternative windowing though.

On Tue, Feb 1, 2022 at 3:10 PM Wei Hsia ☁ <we...@google.com> wrote:

> Hi All,
>
> For BEAM-13298 <https://issues.apache.org/jira/browse/BEAM-13298>, I've
> been trying to alter KafkaIO to output a PCollection.
>
> The idea is to output the input object but technically, EOS will window
> the object
> <https://github.com/weifonghsia/beam/blob/KafkaIOWithOutput/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L171>which
> makes it not the original object?
>
> Would it be more correct to simply output the input as is? or should I be
> going into EOS to return the post-windowed object?
>
> Thanks,
>
> Wei
>
>
> Wei Hsia
>
> Developer Advocate
>
> Google Cloud
>
> weihsia@google.com
>
> 949.794.2004 <(949)%20794-2004>
>