You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Gaurav Nakum <ga...@oracle.com> on 2020/09/10 19:38:54 UTC

Output from Window not getting materialized

Hi everyone!

We are developing a new IO connector using the SDF API, and testing it with the following simple counting pipeline:
 
p.apply(MyIO.read()
        .withStream(inputStream)
        .withStreamPartitions(Arrays.asList(0))
        .withConsumerConfig(config)
    ) // gets a PCollection<KV<String, String>>
 
 
.apply(Values.<String>create()) // PCollection<String>
 
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
    .withAllowedLateness(Duration.standardDays(1))
    .accumulatingFiredPanes())
 
.apply(Count.<String>perElement()) 
 
 
// write PCollection<KV<String, Long>> to stream
.apply(MyIO.write()
        .withStream(outputStream)
        .withConsumerConfig(config));
 
 

Without the window transform, we can read from the stream and write to it, however, I don’t see output after the Window transform. Could you please help pin down the issue?

Thank you,

Gaurav


Re: Output from Window not getting materialized

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Hello Luke,

Thanks for the reference. The plan is to go with "MonotonicallyIncreasing
Watermark Estimator" but not sure about how to implement it along with our
source which is "Oracle Streaming Service" (OSS). For other sources like
Kafka I can see the availability of "TimeStampPolicy" through which
Watermark in the IO talks natively to Kafka. I looked for something
similar in OSS but did not find any on its SDK. I am planning to check with
the OSS development team on this and would help if you could share what
exactly we would need from the source to implement watermark. If you have
any other code base for implementing a monotonically increasing watermark,
that will also be helpful.

Regards,
Praveen

On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:

> Is the watermark advancing[1, 2] for the SDF such that the windows can
> close allowing for the Count transform to produce output?
>
> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>
> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
> wrote:
>
>> Hi everyone!
>>
>> We are developing a new IO connector using the SDF API, and testing it
>> with the following simple counting pipeline:
>>
>>
>>
>> p.apply(MyIO.read()
>>
>>         .withStream(inputStream)
>>
>>         .withStreamPartitions(Arrays.asList(0))
>>
>>         .withConsumerConfig(config)
>>
>>     ) // gets a PCollection<KV<String, String>>
>>
>>
>>
>>
>>
>> .apply(Values.<String>*create*()) // PCollection<String>
>>
>>
>>
>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>
>>     .withAllowedLateness(Duration.standardDays(1))
>>
>>     .accumulatingFiredPanes())
>>
>>
>>
>> .apply(Count.<String>perElement())
>>
>>
>>
>>
>>
>> // write PCollection<KV<String, Long>> to stream
>>
>> .apply(MyIO.write()
>>
>>         .withStream(outputStream)
>>
>>         .withConsumerConfig(config));
>>
>>
>>
>>
>>
>> Without the window transform, we can read from the stream and write to
>> it, however, I don’t see output after the Window transform. Could you
>> please help pin down the issue?
>>
>> Thank you,
>>
>> Gaurav
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: Output from Window not getting materialized

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Thanks Luke. I will look into @RequiresTimeSortedInput for the sorting
requirement. In parallel, I will start on the monotonically increasing
watermark estimator and come back if I have any questions. Have a great day.

On Mon, Sep 21, 2020 at 8:48 PM Luke Cwik <lc...@google.com> wrote:

>
>
> On Mon, Sep 21, 2020 at 5:02 PM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Hi Luke, Thanks for the detailed explanation. This gives more insight to
>> new people like me trying to grok the whole concept.
>>
>> *1) What timestamp your going to output your records at*
>>
>> ** use upstream input elements timestamp: guidance use the default
>> implementation and to get the upstream watermark by default*
>> ** use data from within the record being output or external system state
>> via an API call: use a watermark estimator*
>>
>> In the above section, I do not think our source has a watermark concept
>> built-in to derive and use it in SDF so we will have to go with the second
>> option. If suppose we could extract a timestamp from the source message
>> then do we have to setWatermark with that extracted timestamp before each
>> output in @ProcessElement? And can we use the Manual Watermark Estimator
>> itself for this approach?
>>
>
> You want to use context.outputWithTimestamp when parsing your own
> timestamps and emitting records.
>
> Using the manual one works but also take a look at timestamp observing
> works since it will be told the timestamp of each element being produced.
> Using the timestamp observing ones (monotonically increasing or your own)
> allows you to decouple the watermark estimator logic from the SDF
> implementation.
>
>
>>
>>
>> *2) How you want to compute the watermark estimate (if at all)*
>> ** the choice here depends on how the elements timestamps progress, are
>> they in exactly sorted order, almost sorted order, completely unsorted,
>> ...?*
>>
>> Our elements are "almost sorted order" because of which we want to hold
>> off processing message_01 with timestamp 11:00:10 AM until we process
>> message-02 with timestamp 11:00:08 AM. How do we enable this ordering while
>> processing the messages?
>>
>> Based on your suggestion, I tried WallTime Estimator and it worked for
>> one of our many scenarios. I am planning to test it with a bunch of other
>> window types and use that till we get a solid hold on doing it in the above
>> mentioned way that can handle the unsorted messages.
>>
>
> If you're extracting the timestamps out of your data, it would likely be
> best to use the monotonically increasing timestamp estimator or write one
> that computes one using some statistical method appropriate to your source.
> If you think you have written one that is generally useful, feel free to
> contribute it to Beam.
>
> You'll want to look into @RequiresTimeSortedInput[1]. This allows you to
> produce the messages in any order and requires the runner to make sure they
> are sorted before passing to a downstream stateful DoFn.
>
> 1:
> https://lists.apache.org/thread.html/9cdac2a363e18be58fa1f14c838c61e8406ae3407e4e2d05e423234c%40%3Cdev.beam.apache.org%3E
>
>
>>
>> Regards,
>> Praveen
>>
>> On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> To answer your specific question, you should create and return the
>>> WallTime estimator. You shouldn't need to interact with it from within
>>> your @ProcessElement call since your elements are using the current time
>>> for their timestamp.
>>>
>>> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Kafka is a complex example because it is adapting code from before
>>>> there was an SDF implementation (namely the TimestampPolicy and the
>>>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>>>>
>>>> There are three types of watermark estimators that are in the Beam Java
>>>> SDK today:
>>>> Manual: Can be invoked from within your @ProcessElement method within
>>>> your SDF allowing you precise control over what the watermark is.
>>>> WallTime: Doesn't need to be interacted with, will report the current
>>>> time as the watermark time. Once it is instantiated and returned via the
>>>> @NewWatermarkEstimator method you don't need to do anything with it. This
>>>> is functionally equivalent to calling setWatermark(Instant.now()) right
>>>> before returning from the @ProcessElement method in the SplittableDoFn on a
>>>> Manual watermark.
>>>> TimestampObserving: Is invoked using the output timestamp for every
>>>> element that is output. This is functionally equivalent to calling
>>>> setWatermark after each output within your @ProcessElement method in the
>>>> SplittableDoFn. The MonotonicallyIncreasing implementation for
>>>> the TimestampObserving estimator ensures that the largest timestamp seen so
>>>> far will be reported for the watermark.
>>>>
>>>> The default is to not set any watermark estimate.
>>>>
>>>> For all watermark estimators you're allowed to set the watermark
>>>> estimate to anything as the runner will recompute the output watermark as:
>>>> new output watermark = max(previous output watermark, min(upstream
>>>> watermark, watermark estimates))
>>>> This effectively means that the watermark will never go backwards from
>>>> the runners point of view but that does mean that setting the watermark
>>>> estimate below the previous output watermark (which isn't observable) will
>>>> not do anything beyond holding the watermark at the previous output
>>>> watermark.
>>>>
>>>> Depending on the windowing strategy and allowed lateness, any records
>>>> that are output with a timestamp that is too early can be considered
>>>> droppably late, otherwise they will be late/ontime/early.
>>>>
>>>> So as an author for an SDF transform, you need to figure out:
>>>> 1) What timestamp your going to output your records at
>>>> * use upstream input elements timestamp: guidance use the default
>>>> implementation and to get the upstream watermark by default
>>>> * use data from within the record being output or external system state
>>>> via an API call: use a watermark estimator
>>>> 2) How you want to compute the watermark estimate (if at all)
>>>> * the choice here depends on how the elements timestamps progress, are
>>>> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>>>>
>>>> For both of these it is upto you to choose how much flexibility in
>>>> these decisions you want to give to your users and that should guide what
>>>> you expose within the API (like how KafkaIO exposes a TimestampPolicy) or
>>>> how many other sources don't expose anything.
>>>>
>>>>
>>>> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
>>>> harish.praveen@gmail.com> wrote:
>>>>
>>>>> Hi Luke,
>>>>>
>>>>> I am also looking at the `WatermarkEstimators.manual` option, in
>>>>> parallel. Now we are getting data past our Fixed Window but the aggregation
>>>>> is not as expected.  The doc says setWatermark will "set timestamp
>>>>> before or at the timestamps of all future elements produced by the
>>>>> associated DoFn". If I output with a timestamp as below then could
>>>>> you please clarify on how we should set the watermark for this manual
>>>>> watermark estimator?
>>>>>
>>>>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>>>>
>>>>> Thanks,
>>>>> Praveen
>>>>>
>>>>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Is the watermark advancing[1, 2] for the SDF such that the windows
>>>>>> can close allowing for the Count transform to produce output?
>>>>>>
>>>>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>>>>
>>>>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <
>>>>>> gaurav.nakum@oracle.com> wrote:
>>>>>>
>>>>>>> Hi everyone!
>>>>>>>
>>>>>>> We are developing a new IO connector using the SDF API, and testing
>>>>>>> it with the following simple counting pipeline:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> p.apply(MyIO.read()
>>>>>>>
>>>>>>>         .withStream(inputStream)
>>>>>>>
>>>>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>>>>
>>>>>>>         .withConsumerConfig(config)
>>>>>>>
>>>>>>>     ) // gets a PCollection<KV<String, String>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>>>
>>>>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>>>>
>>>>>>>     .accumulatingFiredPanes())
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> .apply(Count.<String>perElement())
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> // write PCollection<KV<String, Long>> to stream
>>>>>>>
>>>>>>> .apply(MyIO.write()
>>>>>>>
>>>>>>>         .withStream(outputStream)
>>>>>>>
>>>>>>>         .withConsumerConfig(config));
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Without the window transform, we can read from the stream and write
>>>>>>> to it, however, I don’t see output after the Window transform. Could you
>>>>>>> please help pin down the issue?
>>>>>>>
>>>>>>> Thank you,
>>>>>>>
>>>>>>> Gaurav
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Praveen K Viswanathan
>>>>>
>>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: Output from Window not getting materialized

Posted by Luke Cwik <lc...@google.com>.
On Mon, Sep 21, 2020 at 5:02 PM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

> Hi Luke, Thanks for the detailed explanation. This gives more insight to
> new people like me trying to grok the whole concept.
>
> *1) What timestamp your going to output your records at*
>
> ** use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default*
> ** use data from within the record being output or external system state
> via an API call: use a watermark estimator*
>
> In the above section, I do not think our source has a watermark concept
> built-in to derive and use it in SDF so we will have to go with the second
> option. If suppose we could extract a timestamp from the source message
> then do we have to setWatermark with that extracted timestamp before each
> output in @ProcessElement? And can we use the Manual Watermark Estimator
> itself for this approach?
>

You want to use context.outputWithTimestamp when parsing your own
timestamps and emitting records.

Using the manual one works but also take a look at timestamp observing
works since it will be told the timestamp of each element being produced.
Using the timestamp observing ones (monotonically increasing or your own)
allows you to decouple the watermark estimator logic from the SDF
implementation.


>
>
> *2) How you want to compute the watermark estimate (if at all)*
> ** the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted,
> ...?*
>
> Our elements are "almost sorted order" because of which we want to hold
> off processing message_01 with timestamp 11:00:10 AM until we process
> message-02 with timestamp 11:00:08 AM. How do we enable this ordering while
> processing the messages?
>
> Based on your suggestion, I tried WallTime Estimator and it worked for one
> of our many scenarios. I am planning to test it with a bunch of other
> window types and use that till we get a solid hold on doing it in the above
> mentioned way that can handle the unsorted messages.
>

If you're extracting the timestamps out of your data, it would likely be
best to use the monotonically increasing timestamp estimator or write one
that computes one using some statistical method appropriate to your source.
If you think you have written one that is generally useful, feel free to
contribute it to Beam.

You'll want to look into @RequiresTimeSortedInput[1]. This allows you to
produce the messages in any order and requires the runner to make sure they
are sorted before passing to a downstream stateful DoFn.

1:
https://lists.apache.org/thread.html/9cdac2a363e18be58fa1f14c838c61e8406ae3407e4e2d05e423234c%40%3Cdev.beam.apache.org%3E


>
> Regards,
> Praveen
>
> On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <lc...@google.com> wrote:
>
>> To answer your specific question, you should create and return the
>> WallTime estimator. You shouldn't need to interact with it from within
>> your @ProcessElement call since your elements are using the current time
>> for their timestamp.
>>
>> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Kafka is a complex example because it is adapting code from before there
>>> was an SDF implementation (namely the TimestampPolicy and the
>>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>>>
>>> There are three types of watermark estimators that are in the Beam Java
>>> SDK today:
>>> Manual: Can be invoked from within your @ProcessElement method within
>>> your SDF allowing you precise control over what the watermark is.
>>> WallTime: Doesn't need to be interacted with, will report the current
>>> time as the watermark time. Once it is instantiated and returned via the
>>> @NewWatermarkEstimator method you don't need to do anything with it. This
>>> is functionally equivalent to calling setWatermark(Instant.now()) right
>>> before returning from the @ProcessElement method in the SplittableDoFn on a
>>> Manual watermark.
>>> TimestampObserving: Is invoked using the output timestamp for every
>>> element that is output. This is functionally equivalent to calling
>>> setWatermark after each output within your @ProcessElement method in the
>>> SplittableDoFn. The MonotonicallyIncreasing implementation for
>>> the TimestampObserving estimator ensures that the largest timestamp seen so
>>> far will be reported for the watermark.
>>>
>>> The default is to not set any watermark estimate.
>>>
>>> For all watermark estimators you're allowed to set the watermark
>>> estimate to anything as the runner will recompute the output watermark as:
>>> new output watermark = max(previous output watermark, min(upstream
>>> watermark, watermark estimates))
>>> This effectively means that the watermark will never go backwards from
>>> the runners point of view but that does mean that setting the watermark
>>> estimate below the previous output watermark (which isn't observable) will
>>> not do anything beyond holding the watermark at the previous output
>>> watermark.
>>>
>>> Depending on the windowing strategy and allowed lateness, any records
>>> that are output with a timestamp that is too early can be considered
>>> droppably late, otherwise they will be late/ontime/early.
>>>
>>> So as an author for an SDF transform, you need to figure out:
>>> 1) What timestamp your going to output your records at
>>> * use upstream input elements timestamp: guidance use the default
>>> implementation and to get the upstream watermark by default
>>> * use data from within the record being output or external system state
>>> via an API call: use a watermark estimator
>>> 2) How you want to compute the watermark estimate (if at all)
>>> * the choice here depends on how the elements timestamps progress, are
>>> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>>>
>>> For both of these it is upto you to choose how much flexibility in these
>>> decisions you want to give to your users and that should guide what you
>>> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
>>> many other sources don't expose anything.
>>>
>>>
>>> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
>>> harish.praveen@gmail.com> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> I am also looking at the `WatermarkEstimators.manual` option, in
>>>> parallel. Now we are getting data past our Fixed Window but the aggregation
>>>> is not as expected.  The doc says setWatermark will "set timestamp
>>>> before or at the timestamps of all future elements produced by the
>>>> associated DoFn". If I output with a timestamp as below then could you
>>>> please clarify on how we should set the watermark for this manual
>>>> watermark estimator?
>>>>
>>>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>>>
>>>> Thanks,
>>>> Praveen
>>>>
>>>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>>>> close allowing for the Count transform to produce output?
>>>>>
>>>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>>>
>>>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!
>>>>>>
>>>>>> We are developing a new IO connector using the SDF API, and testing
>>>>>> it with the following simple counting pipeline:
>>>>>>
>>>>>>
>>>>>>
>>>>>> p.apply(MyIO.read()
>>>>>>
>>>>>>         .withStream(inputStream)
>>>>>>
>>>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>>>
>>>>>>         .withConsumerConfig(config)
>>>>>>
>>>>>>     ) // gets a PCollection<KV<String, String>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>>>
>>>>>>
>>>>>>
>>>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>>
>>>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>>>
>>>>>>     .accumulatingFiredPanes())
>>>>>>
>>>>>>
>>>>>>
>>>>>> .apply(Count.<String>perElement())
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> // write PCollection<KV<String, Long>> to stream
>>>>>>
>>>>>> .apply(MyIO.write()
>>>>>>
>>>>>>         .withStream(outputStream)
>>>>>>
>>>>>>         .withConsumerConfig(config));
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Without the window transform, we can read from the stream and write
>>>>>> to it, however, I don’t see output after the Window transform. Could you
>>>>>> please help pin down the issue?
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Gaurav
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Praveen K Viswanathan
>>>>
>>>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Re: Output from Window not getting materialized

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Hi Luke, Thanks for the detailed explanation. This gives more insight to
new people like me trying to grok the whole concept.

*1) What timestamp your going to output your records at*

** use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default*
** use data from within the record being output or external system state
via an API call: use a watermark estimator*

In the above section, I do not think our source has a watermark concept
built-in to derive and use it in SDF so we will have to go with the second
option. If suppose we could extract a timestamp from the source message
then do we have to setWatermark with that extracted timestamp before each
output in @ProcessElement? And can we use the Manual Watermark Estimator
itself for this approach?


*2) How you want to compute the watermark estimate (if at all)*
** the choice here depends on how the elements timestamps progress, are
they in exactly sorted order, almost sorted order, completely unsorted,
...?*

Our elements are "almost sorted order" because of which we want to hold off
processing message_01 with timestamp 11:00:10 AM until we process
message-02 with timestamp 11:00:08 AM. How do we enable this ordering while
processing the messages?

Based on your suggestion, I tried WallTime Estimator and it worked for one
of our many scenarios. I am planning to test it with a bunch of other
window types and use that till we get a solid hold on doing it in the above
mentioned way that can handle the unsorted messages.

Regards,
Praveen

On Fri, Sep 18, 2020 at 10:06 AM Luke Cwik <lc...@google.com> wrote:

> To answer your specific question, you should create and return the
> WallTime estimator. You shouldn't need to interact with it from within
> your @ProcessElement call since your elements are using the current time
> for their timestamp.
>
> On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <lc...@google.com> wrote:
>
>> Kafka is a complex example because it is adapting code from before there
>> was an SDF implementation (namely the TimestampPolicy and the
>> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>>
>> There are three types of watermark estimators that are in the Beam Java
>> SDK today:
>> Manual: Can be invoked from within your @ProcessElement method within
>> your SDF allowing you precise control over what the watermark is.
>> WallTime: Doesn't need to be interacted with, will report the current
>> time as the watermark time. Once it is instantiated and returned via the
>> @NewWatermarkEstimator method you don't need to do anything with it. This
>> is functionally equivalent to calling setWatermark(Instant.now()) right
>> before returning from the @ProcessElement method in the SplittableDoFn on a
>> Manual watermark.
>> TimestampObserving: Is invoked using the output timestamp for every
>> element that is output. This is functionally equivalent to calling
>> setWatermark after each output within your @ProcessElement method in the
>> SplittableDoFn. The MonotonicallyIncreasing implementation for
>> the TimestampObserving estimator ensures that the largest timestamp seen so
>> far will be reported for the watermark.
>>
>> The default is to not set any watermark estimate.
>>
>> For all watermark estimators you're allowed to set the watermark estimate
>> to anything as the runner will recompute the output watermark as:
>> new output watermark = max(previous output watermark, min(upstream
>> watermark, watermark estimates))
>> This effectively means that the watermark will never go backwards from
>> the runners point of view but that does mean that setting the watermark
>> estimate below the previous output watermark (which isn't observable) will
>> not do anything beyond holding the watermark at the previous output
>> watermark.
>>
>> Depending on the windowing strategy and allowed lateness, any records
>> that are output with a timestamp that is too early can be considered
>> droppably late, otherwise they will be late/ontime/early.
>>
>> So as an author for an SDF transform, you need to figure out:
>> 1) What timestamp your going to output your records at
>> * use upstream input elements timestamp: guidance use the default
>> implementation and to get the upstream watermark by default
>> * use data from within the record being output or external system state
>> via an API call: use a watermark estimator
>> 2) How you want to compute the watermark estimate (if at all)
>> * the choice here depends on how the elements timestamps progress, are
>> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>>
>> For both of these it is upto you to choose how much flexibility in these
>> decisions you want to give to your users and that should guide what you
>> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
>> many other sources don't expose anything.
>>
>>
>> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
>> harish.praveen@gmail.com> wrote:
>>
>>> Hi Luke,
>>>
>>> I am also looking at the `WatermarkEstimators.manual` option, in
>>> parallel. Now we are getting data past our Fixed Window but the aggregation
>>> is not as expected.  The doc says setWatermark will "set timestamp
>>> before or at the timestamps of all future elements produced by the
>>> associated DoFn". If I output with a timestamp as below then could you
>>> please clarify on how we should set the watermark for this manual
>>> watermark estimator?
>>>
>>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>>
>>> Thanks,
>>> Praveen
>>>
>>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>>> close allowing for the Count transform to produce output?
>>>>
>>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>>
>>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
>>>> wrote:
>>>>
>>>>> Hi everyone!
>>>>>
>>>>> We are developing a new IO connector using the SDF API, and testing it
>>>>> with the following simple counting pipeline:
>>>>>
>>>>>
>>>>>
>>>>> p.apply(MyIO.read()
>>>>>
>>>>>         .withStream(inputStream)
>>>>>
>>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>>
>>>>>         .withConsumerConfig(config)
>>>>>
>>>>>     ) // gets a PCollection<KV<String, String>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>>
>>>>>
>>>>>
>>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>>
>>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>>
>>>>>     .accumulatingFiredPanes())
>>>>>
>>>>>
>>>>>
>>>>> .apply(Count.<String>perElement())
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> // write PCollection<KV<String, Long>> to stream
>>>>>
>>>>> .apply(MyIO.write()
>>>>>
>>>>>         .withStream(outputStream)
>>>>>
>>>>>         .withConsumerConfig(config));
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Without the window transform, we can read from the stream and write to
>>>>> it, however, I don’t see output after the Window transform. Could you
>>>>> please help pin down the issue?
>>>>>
>>>>> Thank you,
>>>>>
>>>>> Gaurav
>>>>>
>>>>
>>>
>>> --
>>> Thanks,
>>> Praveen K Viswanathan
>>>
>>

-- 
Thanks,
Praveen K Viswanathan

Re: Output from Window not getting materialized

Posted by Luke Cwik <lc...@google.com>.
To answer your specific question, you should create and return the WallTime
estimator. You shouldn't need to interact with it from within
your @ProcessElement call since your elements are using the current time
for their timestamp.

On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <lc...@google.com> wrote:

> Kafka is a complex example because it is adapting code from before there
> was an SDF implementation (namely the TimestampPolicy and the
> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>
> There are three types of watermark estimators that are in the Beam Java
> SDK today:
> Manual: Can be invoked from within your @ProcessElement method within your
> SDF allowing you precise control over what the watermark is.
> WallTime: Doesn't need to be interacted with, will report the current time
> as the watermark time. Once it is instantiated and returned via the
> @NewWatermarkEstimator method you don't need to do anything with it. This
> is functionally equivalent to calling setWatermark(Instant.now()) right
> before returning from the @ProcessElement method in the SplittableDoFn on a
> Manual watermark.
> TimestampObserving: Is invoked using the output timestamp for every
> element that is output. This is functionally equivalent to calling
> setWatermark after each output within your @ProcessElement method in the
> SplittableDoFn. The MonotonicallyIncreasing implementation for
> the TimestampObserving estimator ensures that the largest timestamp seen so
> far will be reported for the watermark.
>
> The default is to not set any watermark estimate.
>
> For all watermark estimators you're allowed to set the watermark estimate
> to anything as the runner will recompute the output watermark as:
> new output watermark = max(previous output watermark, min(upstream
> watermark, watermark estimates))
> This effectively means that the watermark will never go backwards from the
> runners point of view but that does mean that setting the watermark
> estimate below the previous output watermark (which isn't observable) will
> not do anything beyond holding the watermark at the previous output
> watermark.
>
> Depending on the windowing strategy and allowed lateness, any records that
> are output with a timestamp that is too early can be considered droppably
> late, otherwise they will be late/ontime/early.
>
> So as an author for an SDF transform, you need to figure out:
> 1) What timestamp your going to output your records at
> * use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default
> * use data from within the record being output or external system state
> via an API call: use a watermark estimator
> 2) How you want to compute the watermark estimate (if at all)
> * the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>
> For both of these it is upto you to choose how much flexibility in these
> decisions you want to give to your users and that should guide what you
> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
> many other sources don't expose anything.
>
>
> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Hi Luke,
>>
>> I am also looking at the `WatermarkEstimators.manual` option, in
>> parallel. Now we are getting data past our Fixed Window but the aggregation
>> is not as expected.  The doc says setWatermark will "set timestamp
>> before or at the timestamps of all future elements produced by the
>> associated DoFn". If I output with a timestamp as below then could you
>> please clarify on how we should set the watermark for this manual
>> watermark estimator?
>>
>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>> close allowing for the Count transform to produce output?
>>>
>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>
>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
>>> wrote:
>>>
>>>> Hi everyone!
>>>>
>>>> We are developing a new IO connector using the SDF API, and testing it
>>>> with the following simple counting pipeline:
>>>>
>>>>
>>>>
>>>> p.apply(MyIO.read()
>>>>
>>>>         .withStream(inputStream)
>>>>
>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>
>>>>         .withConsumerConfig(config)
>>>>
>>>>     ) // gets a PCollection<KV<String, String>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>
>>>>
>>>>
>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>
>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>
>>>>     .accumulatingFiredPanes())
>>>>
>>>>
>>>>
>>>> .apply(Count.<String>perElement())
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> // write PCollection<KV<String, Long>> to stream
>>>>
>>>> .apply(MyIO.write()
>>>>
>>>>         .withStream(outputStream)
>>>>
>>>>         .withConsumerConfig(config));
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Without the window transform, we can read from the stream and write to
>>>> it, however, I don’t see output after the Window transform. Could you
>>>> please help pin down the issue?
>>>>
>>>> Thank you,
>>>>
>>>> Gaurav
>>>>
>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

Re: Output from Window not getting materialized

Posted by Luke Cwik <lc...@google.com>.
To answer your specific question, you should create and return the WallTime
estimator. You shouldn't need to interact with it from within
your @ProcessElement call since your elements are using the current time
for their timestamp.

On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <lc...@google.com> wrote:

> Kafka is a complex example because it is adapting code from before there
> was an SDF implementation (namely the TimestampPolicy and the
> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>
> There are three types of watermark estimators that are in the Beam Java
> SDK today:
> Manual: Can be invoked from within your @ProcessElement method within your
> SDF allowing you precise control over what the watermark is.
> WallTime: Doesn't need to be interacted with, will report the current time
> as the watermark time. Once it is instantiated and returned via the
> @NewWatermarkEstimator method you don't need to do anything with it. This
> is functionally equivalent to calling setWatermark(Instant.now()) right
> before returning from the @ProcessElement method in the SplittableDoFn on a
> Manual watermark.
> TimestampObserving: Is invoked using the output timestamp for every
> element that is output. This is functionally equivalent to calling
> setWatermark after each output within your @ProcessElement method in the
> SplittableDoFn. The MonotonicallyIncreasing implementation for
> the TimestampObserving estimator ensures that the largest timestamp seen so
> far will be reported for the watermark.
>
> The default is to not set any watermark estimate.
>
> For all watermark estimators you're allowed to set the watermark estimate
> to anything as the runner will recompute the output watermark as:
> new output watermark = max(previous output watermark, min(upstream
> watermark, watermark estimates))
> This effectively means that the watermark will never go backwards from the
> runners point of view but that does mean that setting the watermark
> estimate below the previous output watermark (which isn't observable) will
> not do anything beyond holding the watermark at the previous output
> watermark.
>
> Depending on the windowing strategy and allowed lateness, any records that
> are output with a timestamp that is too early can be considered droppably
> late, otherwise they will be late/ontime/early.
>
> So as an author for an SDF transform, you need to figure out:
> 1) What timestamp your going to output your records at
> * use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default
> * use data from within the record being output or external system state
> via an API call: use a watermark estimator
> 2) How you want to compute the watermark estimate (if at all)
> * the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>
> For both of these it is upto you to choose how much flexibility in these
> decisions you want to give to your users and that should guide what you
> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
> many other sources don't expose anything.
>
>
> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
> harish.praveen@gmail.com> wrote:
>
>> Hi Luke,
>>
>> I am also looking at the `WatermarkEstimators.manual` option, in
>> parallel. Now we are getting data past our Fixed Window but the aggregation
>> is not as expected.  The doc says setWatermark will "set timestamp
>> before or at the timestamps of all future elements produced by the
>> associated DoFn". If I output with a timestamp as below then could you
>> please clarify on how we should set the watermark for this manual
>> watermark estimator?
>>
>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>> close allowing for the Count transform to produce output?
>>>
>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>
>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
>>> wrote:
>>>
>>>> Hi everyone!
>>>>
>>>> We are developing a new IO connector using the SDF API, and testing it
>>>> with the following simple counting pipeline:
>>>>
>>>>
>>>>
>>>> p.apply(MyIO.read()
>>>>
>>>>         .withStream(inputStream)
>>>>
>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>
>>>>         .withConsumerConfig(config)
>>>>
>>>>     ) // gets a PCollection<KV<String, String>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>
>>>>
>>>>
>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>
>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>
>>>>     .accumulatingFiredPanes())
>>>>
>>>>
>>>>
>>>> .apply(Count.<String>perElement())
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> // write PCollection<KV<String, Long>> to stream
>>>>
>>>> .apply(MyIO.write()
>>>>
>>>>         .withStream(outputStream)
>>>>
>>>>         .withConsumerConfig(config));
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Without the window transform, we can read from the stream and write to
>>>> it, however, I don’t see output after the Window transform. Could you
>>>> please help pin down the issue?
>>>>
>>>> Thank you,
>>>>
>>>> Gaurav
>>>>
>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

Re: Output from Window not getting materialized

Posted by Luke Cwik <lc...@google.com>.
Kafka is a complex example because it is adapting code from before there
was an SDF implementation (namely the TimestampPolicy and the
TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).

There are three types of watermark estimators that are in the Beam Java SDK
today:
Manual: Can be invoked from within your @ProcessElement method within your
SDF allowing you precise control over what the watermark is.
WallTime: Doesn't need to be interacted with, will report the current time
as the watermark time. Once it is instantiated and returned via the
@NewWatermarkEstimator method you don't need to do anything with it. This
is functionally equivalent to calling setWatermark(Instant.now()) right
before returning from the @ProcessElement method in the SplittableDoFn on a
Manual watermark.
TimestampObserving: Is invoked using the output timestamp for every element
that is output. This is functionally equivalent to calling setWatermark
after each output within your @ProcessElement method in the SplittableDoFn.
The MonotonicallyIncreasing implementation for the TimestampObserving
estimator ensures that the largest timestamp seen so far will be reported
for the watermark.

The default is to not set any watermark estimate.

For all watermark estimators you're allowed to set the watermark estimate
to anything as the runner will recompute the output watermark as:
new output watermark = max(previous output watermark, min(upstream
watermark, watermark estimates))
This effectively means that the watermark will never go backwards from the
runners point of view but that does mean that setting the watermark
estimate below the previous output watermark (which isn't observable) will
not do anything beyond holding the watermark at the previous output
watermark.

Depending on the windowing strategy and allowed lateness, any records that
are output with a timestamp that is too early can be considered droppably
late, otherwise they will be late/ontime/early.

So as an author for an SDF transform, you need to figure out:
1) What timestamp your going to output your records at
* use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default
* use data from within the record being output or external system state via
an API call: use a watermark estimator
2) How you want to compute the watermark estimate (if at all)
* the choice here depends on how the elements timestamps progress, are they
in exactly sorted order, almost sorted order, completely unsorted, ...?

For both of these it is upto you to choose how much flexibility in these
decisions you want to give to your users and that should guide what you
expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
many other sources don't expose anything.


On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

> Hi Luke,
>
> I am also looking at the `WatermarkEstimators.manual` option, in parallel.
> Now we are getting data past our Fixed Window but the aggregation is not as
> expected.  The doc says setWatermark will "set timestamp before or at the
> timestamps of all future elements produced by the associated DoFn". If I
> output with a timestamp as below then could you please clarify on how we
> should set the watermark for this manual watermark estimator?
>
> receiver.outputWithTimestamp(ossRecord, Instant.now());
>
> Thanks,
> Praveen
>
> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>
>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>> close allowing for the Count transform to produce output?
>>
>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
>> wrote:
>>
>>> Hi everyone!
>>>
>>> We are developing a new IO connector using the SDF API, and testing it
>>> with the following simple counting pipeline:
>>>
>>>
>>>
>>> p.apply(MyIO.read()
>>>
>>>         .withStream(inputStream)
>>>
>>>         .withStreamPartitions(Arrays.asList(0))
>>>
>>>         .withConsumerConfig(config)
>>>
>>>     ) // gets a PCollection<KV<String, String>>
>>>
>>>
>>>
>>>
>>>
>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>
>>>
>>>
>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>
>>>     .withAllowedLateness(Duration.standardDays(1))
>>>
>>>     .accumulatingFiredPanes())
>>>
>>>
>>>
>>> .apply(Count.<String>perElement())
>>>
>>>
>>>
>>>
>>>
>>> // write PCollection<KV<String, Long>> to stream
>>>
>>> .apply(MyIO.write()
>>>
>>>         .withStream(outputStream)
>>>
>>>         .withConsumerConfig(config));
>>>
>>>
>>>
>>>
>>>
>>> Without the window transform, we can read from the stream and write to
>>> it, however, I don’t see output after the Window transform. Could you
>>> please help pin down the issue?
>>>
>>> Thank you,
>>>
>>> Gaurav
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Re: Output from Window not getting materialized

Posted by Luke Cwik <lc...@google.com>.
Kafka is a complex example because it is adapting code from before there
was an SDF implementation (namely the TimestampPolicy and the
TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).

There are three types of watermark estimators that are in the Beam Java SDK
today:
Manual: Can be invoked from within your @ProcessElement method within your
SDF allowing you precise control over what the watermark is.
WallTime: Doesn't need to be interacted with, will report the current time
as the watermark time. Once it is instantiated and returned via the
@NewWatermarkEstimator method you don't need to do anything with it. This
is functionally equivalent to calling setWatermark(Instant.now()) right
before returning from the @ProcessElement method in the SplittableDoFn on a
Manual watermark.
TimestampObserving: Is invoked using the output timestamp for every element
that is output. This is functionally equivalent to calling setWatermark
after each output within your @ProcessElement method in the SplittableDoFn.
The MonotonicallyIncreasing implementation for the TimestampObserving
estimator ensures that the largest timestamp seen so far will be reported
for the watermark.

The default is to not set any watermark estimate.

For all watermark estimators you're allowed to set the watermark estimate
to anything as the runner will recompute the output watermark as:
new output watermark = max(previous output watermark, min(upstream
watermark, watermark estimates))
This effectively means that the watermark will never go backwards from the
runners point of view but that does mean that setting the watermark
estimate below the previous output watermark (which isn't observable) will
not do anything beyond holding the watermark at the previous output
watermark.

Depending on the windowing strategy and allowed lateness, any records that
are output with a timestamp that is too early can be considered droppably
late, otherwise they will be late/ontime/early.

So as an author for an SDF transform, you need to figure out:
1) What timestamp your going to output your records at
* use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default
* use data from within the record being output or external system state via
an API call: use a watermark estimator
2) How you want to compute the watermark estimate (if at all)
* the choice here depends on how the elements timestamps progress, are they
in exactly sorted order, almost sorted order, completely unsorted, ...?

For both of these it is upto you to choose how much flexibility in these
decisions you want to give to your users and that should guide what you
expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
many other sources don't expose anything.


On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
harish.praveen@gmail.com> wrote:

> Hi Luke,
>
> I am also looking at the `WatermarkEstimators.manual` option, in parallel.
> Now we are getting data past our Fixed Window but the aggregation is not as
> expected.  The doc says setWatermark will "set timestamp before or at the
> timestamps of all future elements produced by the associated DoFn". If I
> output with a timestamp as below then could you please clarify on how we
> should set the watermark for this manual watermark estimator?
>
> receiver.outputWithTimestamp(ossRecord, Instant.now());
>
> Thanks,
> Praveen
>
> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:
>
>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>> close allowing for the Count transform to produce output?
>>
>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
>> wrote:
>>
>>> Hi everyone!
>>>
>>> We are developing a new IO connector using the SDF API, and testing it
>>> with the following simple counting pipeline:
>>>
>>>
>>>
>>> p.apply(MyIO.read()
>>>
>>>         .withStream(inputStream)
>>>
>>>         .withStreamPartitions(Arrays.asList(0))
>>>
>>>         .withConsumerConfig(config)
>>>
>>>     ) // gets a PCollection<KV<String, String>>
>>>
>>>
>>>
>>>
>>>
>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>
>>>
>>>
>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>
>>>     .withAllowedLateness(Duration.standardDays(1))
>>>
>>>     .accumulatingFiredPanes())
>>>
>>>
>>>
>>> .apply(Count.<String>perElement())
>>>
>>>
>>>
>>>
>>>
>>> // write PCollection<KV<String, Long>> to stream
>>>
>>> .apply(MyIO.write()
>>>
>>>         .withStream(outputStream)
>>>
>>>         .withConsumerConfig(config));
>>>
>>>
>>>
>>>
>>>
>>> Without the window transform, we can read from the stream and write to
>>> it, however, I don’t see output after the Window transform. Could you
>>> please help pin down the issue?
>>>
>>> Thank you,
>>>
>>> Gaurav
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>

Re: Output from Window not getting materialized

Posted by Praveen K Viswanathan <ha...@gmail.com>.
Hi Luke,

I am also looking at the `WatermarkEstimators.manual` option, in parallel.
Now we are getting data past our Fixed Window but the aggregation is not as
expected.  The doc says setWatermark will "set timestamp before or at the
timestamps of all future elements produced by the associated DoFn". If I
output with a timestamp as below then could you please clarify on how we
should set the watermark for this manual watermark estimator?

receiver.outputWithTimestamp(ossRecord, Instant.now());

Thanks,
Praveen

On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <lc...@google.com> wrote:

> Is the watermark advancing[1, 2] for the SDF such that the windows can
> close allowing for the Count transform to produce output?
>
> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>
> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
> wrote:
>
>> Hi everyone!
>>
>> We are developing a new IO connector using the SDF API, and testing it
>> with the following simple counting pipeline:
>>
>>
>>
>> p.apply(MyIO.read()
>>
>>         .withStream(inputStream)
>>
>>         .withStreamPartitions(Arrays.asList(0))
>>
>>         .withConsumerConfig(config)
>>
>>     ) // gets a PCollection<KV<String, String>>
>>
>>
>>
>>
>>
>> .apply(Values.<String>*create*()) // PCollection<String>
>>
>>
>>
>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>
>>     .withAllowedLateness(Duration.standardDays(1))
>>
>>     .accumulatingFiredPanes())
>>
>>
>>
>> .apply(Count.<String>perElement())
>>
>>
>>
>>
>>
>> // write PCollection<KV<String, Long>> to stream
>>
>> .apply(MyIO.write()
>>
>>         .withStream(outputStream)
>>
>>         .withConsumerConfig(config));
>>
>>
>>
>>
>>
>> Without the window transform, we can read from the stream and write to
>> it, however, I don’t see output after the Window transform. Could you
>> please help pin down the issue?
>>
>> Thank you,
>>
>> Gaurav
>>
>

-- 
Thanks,
Praveen K Viswanathan

Re: Output from Window not getting materialized

Posted by Luke Cwik <lc...@google.com>.
Is the watermark advancing[1, 2] for the SDF such that the windows can
close allowing for the Count transform to produce output?

1: https://www.youtube.com/watch?v=TWxSLmkWPm4
2: https://beam.apache.org/documentation/programming-guide/#windowing

On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <ga...@oracle.com>
wrote:

> Hi everyone!
>
> We are developing a new IO connector using the SDF API, and testing it
> with the following simple counting pipeline:
>
>
>
> p.apply(MyIO.read()
>
>         .withStream(inputStream)
>
>         .withStreamPartitions(Arrays.asList(0))
>
>         .withConsumerConfig(config)
>
>     ) // gets a PCollection<KV<String, String>>
>
>
>
>
>
> .apply(Values.<String>*create*()) // PCollection<String>
>
>
>
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .withAllowedLateness(Duration.standardDays(1))
>
>     .accumulatingFiredPanes())
>
>
>
> .apply(Count.<String>perElement())
>
>
>
>
>
> // write PCollection<KV<String, Long>> to stream
>
> .apply(MyIO.write()
>
>         .withStream(outputStream)
>
>         .withConsumerConfig(config));
>
>
>
>
>
> Without the window transform, we can read from the stream and write to it,
> however, I don’t see output after the Window transform. Could you please
> help pin down the issue?
>
> Thank you,
>
> Gaurav
>