You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pavel Solomin <p....@gmail.com> on 2021/10/05 15:20:39 UTC

File sink with windows and side inputs

Hello!

I am developing a pipeline which uses FileIO.<>writeDynamic() to write Avro
files. Each destination has its own schema, and I am trying to create a
PCollectionView of schemas to be used in Writer.

The pipeline works fine if I don't apply window, but in case of unbound
source, I have to apply windowing before writing and before building
PCollectionView.

As per the doc
https://beam.apache.org/documentation/programming-guide/#Side-inputs-and-windowing
:

"If the main input and side inputs have identical windows, the projection
provides the exact corresponding window"

But it doesn't seem to work exactly like this. With windowed collection, as
soon as I apply

   .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))

I start getting empty PCollectionView when Writer tries to process its
elements. I am almost sure I am not setting up the window or view
correctly, in a way it works for unbounded collections, can anyone help me?
I've attached a small java project which demonstrates the issue (writer
without window works, but writer with window throws NPE, due to
PCollectionView being empty).

Thanks!

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>

Re: File sink with windows and side inputs

Posted by Pavel Solomin <p....@gmail.com>.
I tried multiple so far.

1. When I have this PCollection

PCollection<Integer> windowed = initial.apply("0",
        Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(30)))

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
                .withAllowedLateness(Duration.standardMinutes(1))
                .accumulatingFiredPanes());

and use it for both main input and side input (view) - the job fails with
both

accumulatingFiredPanes()

and

discardingFiredPanes()

When I set different windows for main input and side view input

PCollection<Integer> windowedForMainInput = initial.apply("0",
        Window.into(FixedWindows.of(Duration.standardSeconds(30))));

PCollection<Integer> windowedForView = initial.apply("1",
        Window.<Integer>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(20))))
                .withAllowedLateness(Duration.standardMinutes(1))
                .discardingFiredPanes());


the test in the example passes, but the streaming job fails anyway when
attempts to write some of the files. Some files are getting written though,
so, looks like the view is not empty, but missing schemas.

Don't remember if I tried

accumulatingFiredPanes()

with streaming job

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 6 Oct 2021 at 21:56, Reuven Lax <re...@google.com> wrote:

> What accumulation mode are you setting on the trigger?
>
> On Wed, Oct 6, 2021 at 2:04 AM Pavel Solomin <p....@gmail.com>
> wrote:
>
>> Hello!
>>
>> After some experimentation I was able to make it half-broken instead of
>> completely broken:
>>
>> - I am using Combine.globally(new Combiner()).asSingletonView() to reduce
>> PCollection into a Map
>> - I made the main input to have smaller windows, and the PCollectionView
>> window to be GlobalWindows with triggers
>>
>> This way I am able to write *some* of the data from my unbound source,
>> but it seems that the PCollectionView has schemas missing, and my app
>> eventually fails anyway with NPE. It looks like the Map is being re-created
>> and destroyed every time the window triggers.
>>
>> In general, is there any example of reducing an unbound PCollection into
>> a PCollectionView<Map> *without* windows? Such that the single instance of
>> Map is being used for the entire lifetime of the app, and is not destroyed
>> & re-created on window trigger?
>>
>> Thanks in advance.
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
>> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Tue, 5 Oct 2021 at 16:20, Pavel Solomin <p....@gmail.com> wrote:
>>
>>> Hello!
>>>
>>> I am developing a pipeline which uses FileIO.<>writeDynamic() to write
>>> Avro files. Each destination has its own schema, and I am trying to create
>>> a PCollectionView of schemas to be used in Writer.
>>>
>>> The pipeline works fine if I don't apply window, but in case of unbound
>>> source, I have to apply windowing before writing and before building
>>> PCollectionView.
>>>
>>> As per the doc
>>> https://beam.apache.org/documentation/programming-guide/#Side-inputs-and-windowing
>>> :
>>>
>>> "If the main input and side inputs have identical windows, the
>>> projection provides the exact corresponding window"
>>>
>>> But it doesn't seem to work exactly like this. With windowed collection,
>>> as soon as I apply
>>>
>>>    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
>>>
>>> I start getting empty PCollectionView when Writer tries to process its
>>> elements. I am almost sure I am not setting up the window or view
>>> correctly, in a way it works for unbounded collections, can anyone help me?
>>> I've attached a small java project which demonstrates the issue (writer
>>> without window works, but writer with window throws NPE, due to
>>> PCollectionView being empty).
>>>
>>> Thanks!
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
>>> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>

Re: File sink with windows and side inputs

Posted by Reuven Lax <re...@google.com>.
What accumulation mode are you setting on the trigger?

On Wed, Oct 6, 2021 at 2:04 AM Pavel Solomin <p....@gmail.com> wrote:

> Hello!
>
> After some experimentation I was able to make it half-broken instead of
> completely broken:
>
> - I am using Combine.globally(new Combiner()).asSingletonView() to reduce
> PCollection into a Map
> - I made the main input to have smaller windows, and the PCollectionView
> window to be GlobalWindows with triggers
>
> This way I am able to write *some* of the data from my unbound source, but
> it seems that the PCollectionView has schemas missing, and my app
> eventually fails anyway with NPE. It looks like the Map is being re-created
> and destroyed every time the window triggers.
>
> In general, is there any example of reducing an unbound PCollection into a
> PCollectionView<Map> *without* windows? Such that the single instance of
> Map is being used for the entire lifetime of the app, and is not destroyed
> & re-created on window trigger?
>
> Thanks in advance.
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Tue, 5 Oct 2021 at 16:20, Pavel Solomin <p....@gmail.com> wrote:
>
>> Hello!
>>
>> I am developing a pipeline which uses FileIO.<>writeDynamic() to write
>> Avro files. Each destination has its own schema, and I am trying to create
>> a PCollectionView of schemas to be used in Writer.
>>
>> The pipeline works fine if I don't apply window, but in case of unbound
>> source, I have to apply windowing before writing and before building
>> PCollectionView.
>>
>> As per the doc
>> https://beam.apache.org/documentation/programming-guide/#Side-inputs-and-windowing
>> :
>>
>> "If the main input and side inputs have identical windows, the projection
>> provides the exact corresponding window"
>>
>> But it doesn't seem to work exactly like this. With windowed collection,
>> as soon as I apply
>>
>>    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
>>
>> I start getting empty PCollectionView when Writer tries to process its
>> elements. I am almost sure I am not setting up the window or view
>> correctly, in a way it works for unbounded collections, can anyone help me?
>> I've attached a small java project which demonstrates the issue (writer
>> without window works, but writer with window throws NPE, due to
>> PCollectionView being empty).
>>
>> Thanks!
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 <+351%20962%20950%20692> | Skype: pavel_solomin |
>> Linkedin <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>

Re: File sink with windows and side inputs

Posted by Pavel Solomin <p....@gmail.com>.
Hello!

After some experimentation I was able to make it half-broken instead of
completely broken:

- I am using Combine.globally(new Combiner()).asSingletonView() to reduce
PCollection into a Map
- I made the main input to have smaller windows, and the PCollectionView
window to be GlobalWindows with triggers

This way I am able to write *some* of the data from my unbound source, but
it seems that the PCollectionView has schemas missing, and my app
eventually fails anyway with NPE. It looks like the Map is being re-created
and destroyed every time the window triggers.

In general, is there any example of reducing an unbound PCollection into a
PCollectionView<Map> *without* windows? Such that the single instance of
Map is being used for the entire lifetime of the app, and is not destroyed
& re-created on window trigger?

Thanks in advance.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Tue, 5 Oct 2021 at 16:20, Pavel Solomin <p....@gmail.com> wrote:

> Hello!
>
> I am developing a pipeline which uses FileIO.<>writeDynamic() to write
> Avro files. Each destination has its own schema, and I am trying to create
> a PCollectionView of schemas to be used in Writer.
>
> The pipeline works fine if I don't apply window, but in case of unbound
> source, I have to apply windowing before writing and before building
> PCollectionView.
>
> As per the doc
> https://beam.apache.org/documentation/programming-guide/#Side-inputs-and-windowing
> :
>
> "If the main input and side inputs have identical windows, the projection
> provides the exact corresponding window"
>
> But it doesn't seem to work exactly like this. With windowed collection,
> as soon as I apply
>
>    .apply(Window.into(FixedWindows.of(Duration.standardSeconds(5))))
>
> I start getting empty PCollectionView when Writer tries to process its
> elements. I am almost sure I am not setting up the window or view
> correctly, in a way it works for unbounded collections, can anyone help me?
> I've attached a small java project which demonstrates the issue (writer
> without window works, but writer with window throws NPE, due to
> PCollectionView being empty).
>
> Thanks!
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>