You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jeff Klukas <jk...@mozilla.com> on 2019/01/09 16:25:18 UTC

Possible to use GlobalWindows for writing unbounded input to files?

I'm building a pipeline that streams from Pubsub and writes to files. I'm
using FileIO's dynamic destinations to place elements into different
directories according to date and I really don't care about ordering of
elements beyond the date buckets.

So, I think GlobalWindows is appropriate in this case, even though the
input is unbounded. Is it possible to use GlobalWindows but set a trigger
based on number of elements and/or processing time so that beam actually
writes out files periodically?

I tried the following:

Window.into(new GlobalWindows())
  .triggering(Repeatedly.forever(AfterFirst.of(
    AfterPane.elementCountAtLeast(10000),

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
  .discardingFiredPanes()

But it raises an exception about incompatible triggers:

Inputs to Flatten had incompatible triggers:
Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
AfterSynchronizedProcessingTime.pastFirstElementInPane()))

I believe that what's happening is that FileIO with explicit numShards
(required in the case of unbounded input) is forcing a GroupByKey, which
activates continuation triggers that are incompatible with my stated
triggers. It's internals of WriteFiles that's trying to flatten the
incompatible PCollections together.

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Chamikara Jayalath <ch...@google.com>.
Actually, this is a documented known issue.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L152

On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jk...@mozilla.com> wrote:

> Indeed, I was wrong about the ValueProvider distinction. I updated that in
> the JIRA.
>
> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
> number. Things work fine for explicit sharding. It's the runner-provided
> sharding mode that encounters the Flatten of PCollections with conflicting
> triggers.
>
> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>
>> FileIO requires an explicit numShards in unbounded mode for a number of
>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>> need something to group on.
>>
>> It is extremely surprising that behavior would change between using a
>> ValueProvider or not. The exact same codepath should be triggered
>> regardless of whether a ValueProvider is used.
>>
>> Reuven
>>
>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>> reading this archived) that there are known problems with continuation
>>> triggers. A spec on continuation triggers that we missed was that they
>>> really must be "compatible" (this is an arbitrary concept, having only to
>>> do with Flattening two PCollections together) with their original trigger.
>>> Without this, we also know that you can have three PCollections with
>>> identical triggering and you can CoGroupByKey them together but you cannot
>>> do this three-way join as a sequence of binary joins.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> Thanks for the response, Chamikara. I filed
>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>
>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>
>>>>> - Cham
>>>>>
>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>>> collections are then flattened together and they have incompatible triggers
>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>
>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>>> no exception is thrown.
>>>>>>
>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>> different directories according to date and I really don't care about
>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>
>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>> actually writes out files periodically?
>>>>>>>
>>>>>>> I tried the following:
>>>>>>>
>>>>>>> Window.into(new GlobalWindows())
>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>   .discardingFiredPanes()
>>>>>>>
>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>
>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>
>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>> flatten the incompatible PCollections together.
>>>>>>>
>>>>>>>
>>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Reuven Lax <re...@google.com>.
Keep in mind that I'm not sure if any of the bounded runners support
triggering like that in any reasonable way. They mostly rely on the fact
that triggers are documented to be non deterministic, and then ignore them.

However this is still a bug though, because a graph that works in unbounded
mode should still at least work in bounded mode.

On Fri, Jan 11, 2019 at 9:47 AM Jeff Klukas <jk...@mozilla.com> wrote:

> It is indeed well documented that numShards is required for unbounded
> input. And I do believe that a helpful error is thrown in the case of
> unbounded input and runner-determined sharding.
>
> I do believe there's still a bug here; it's just wandered quite a bit from
> the original title of the thread. The title should now be "Exception when
> using custom triggering and runner-determined file sharding".
>
> I was seeing the IllegalStateException in a unit test when I tried to
> compile my pipeline with the custom triggering. That unit test exercised
> *bounded* file input and numShards=0.
>
> In bounded mode, it would still be useful to be able to limit file sizes
> via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But
> elementCountAtLeast will emit a continuation trigger that trips the Flatten
> problem for runner-determined sharding.
>
>
> On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax <re...@google.com> wrote:
>
>> Ah,
>>
>> numShards = 0 is explicitly not supported in unbounded mode today, for
>> the reason mentioned above. If FileIO doesn't reject the pipeline in that
>> case, we should fix that.
>>
>> Reuven
>>
>> On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Indeed, I was wrong about the ValueProvider distinction. I updated that
>>> in the JIRA.
>>>
>>> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
>>> number. Things work fine for explicit sharding. It's the runner-provided
>>> sharding mode that encounters the Flatten of PCollections with conflicting
>>> triggers.
>>>
>>> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> FileIO requires an explicit numShards in unbounded mode for a number of
>>>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>>>> need something to group on.
>>>>
>>>> It is extremely surprising that behavior would change between using a
>>>> ValueProvider or not. The exact same codepath should be triggered
>>>> regardless of whether a ValueProvider is used.
>>>>
>>>> Reuven
>>>>
>>>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org>
>>>> wrote:
>>>>
>>>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>>>> reading this archived) that there are known problems with continuation
>>>>> triggers. A spec on continuation triggers that we missed was that they
>>>>> really must be "compatible" (this is an arbitrary concept, having only to
>>>>> do with Flattening two PCollections together) with their original trigger.
>>>>> Without this, we also know that you can have three PCollections with
>>>>> identical triggering and you can CoGroupByKey them together but you cannot
>>>>> do this three-way join as a sequence of binary joins.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the response, Chamikara. I filed
>>>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can
>>>>>> work around the problem in my case by not using a ValueProvider for
>>>>>> numShards.
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <
>>>>>> chamikara@google.com> wrote:
>>>>>>
>>>>>>> I'm not to familiar about the exact underlying issue here but
>>>>>>> writing unbounded input to files when using GlobalWindows for unsharded
>>>>>>> output is a valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>>>
>>>>>>> - Cham
>>>>>>>
>>>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I've read more deeply into the WriteFiles code and I'm
>>>>>>>> understanding now that the exception is due to WriteFiles' attempt to
>>>>>>>> handle unsharded input. In that case, it creates a sharded and unsharded
>>>>>>>> collection; the first goes through one GroupByKey while the other goes
>>>>>>>> through 2. These two collections are then flattened together and they have
>>>>>>>> incompatible triggers due to the double-grouped collection using a
>>>>>>>> continuation trigger.
>>>>>>>>
>>>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if
>>>>>>>> I switch to hard coding an integer rather than passing a ValueProvider,
>>>>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>>>>> no exception is thrown.
>>>>>>>>
>>>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>>>
>>>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>>>> different directories according to date and I really don't care about
>>>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>>>
>>>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>>>> actually writes out files periodically?
>>>>>>>>>
>>>>>>>>> I tried the following:
>>>>>>>>>
>>>>>>>>> Window.into(new GlobalWindows())
>>>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>>>
>>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>>>   .discardingFiredPanes()
>>>>>>>>>
>>>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>>>
>>>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>>>
>>>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>>>> flatten the incompatible PCollections together.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Jeff Klukas <jk...@mozilla.com>.
It is indeed well documented that numShards is required for unbounded
input. And I do believe that a helpful error is thrown in the case of
unbounded input and runner-determined sharding.

I do believe there's still a bug here; it's just wandered quite a bit from
the original title of the thread. The title should now be "Exception when
using custom triggering and runner-determined file sharding".

I was seeing the IllegalStateException in a unit test when I tried to
compile my pipeline with the custom triggering. That unit test exercised
*bounded* file input and numShards=0.

In bounded mode, it would still be useful to be able to limit file sizes
via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But
elementCountAtLeast will emit a continuation trigger that trips the Flatten
problem for runner-determined sharding.


On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax <re...@google.com> wrote:

> Ah,
>
> numShards = 0 is explicitly not supported in unbounded mode today, for the
> reason mentioned above. If FileIO doesn't reject the pipeline in that case,
> we should fix that.
>
> Reuven
>
> On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Indeed, I was wrong about the ValueProvider distinction. I updated that
>> in the JIRA.
>>
>> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
>> number. Things work fine for explicit sharding. It's the runner-provided
>> sharding mode that encounters the Flatten of PCollections with conflicting
>> triggers.
>>
>> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>>
>>> FileIO requires an explicit numShards in unbounded mode for a number of
>>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>>> need something to group on.
>>>
>>> It is extremely surprising that behavior would change between using a
>>> ValueProvider or not. The exact same codepath should be triggered
>>> regardless of whether a ValueProvider is used.
>>>
>>> Reuven
>>>
>>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>>> reading this archived) that there are known problems with continuation
>>>> triggers. A spec on continuation triggers that we missed was that they
>>>> really must be "compatible" (this is an arbitrary concept, having only to
>>>> do with Flattening two PCollections together) with their original trigger.
>>>> Without this, we also know that you can have three PCollections with
>>>> identical triggering and you can CoGroupByKey them together but you cannot
>>>> do this three-way join as a sequence of binary joins.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com>
>>>> wrote:
>>>>
>>>>> Thanks for the response, Chamikara. I filed
>>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>>
>>>>>> - Cham
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>>>> collections are then flattened together and they have incompatible triggers
>>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>>
>>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>>>> no exception is thrown.
>>>>>>>
>>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>>
>>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>>> different directories according to date and I really don't care about
>>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>>
>>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>>> actually writes out files periodically?
>>>>>>>>
>>>>>>>> I tried the following:
>>>>>>>>
>>>>>>>> Window.into(new GlobalWindows())
>>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>>
>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>>   .discardingFiredPanes()
>>>>>>>>
>>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>>
>>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>>
>>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>>> flatten the incompatible PCollections together.
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Jeff Klukas <jk...@mozilla.com>.
It is indeed well documented that numShards is required for unbounded
input. And I do believe that a helpful error is thrown in the case of
unbounded input and runner-determined sharding.

I do believe there's still a bug here; it's just wandered quite a bit from
the original title of the thread. The title should now be "Exception when
using custom triggering and runner-determined file sharding".

I was seeing the IllegalStateException in a unit test when I tried to
compile my pipeline with the custom triggering. That unit test exercised
*bounded* file input and numShards=0.

In bounded mode, it would still be useful to be able to limit file sizes
via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But
elementCountAtLeast will emit a continuation trigger that trips the Flatten
problem for runner-determined sharding.


On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax <re...@google.com> wrote:

> Ah,
>
> numShards = 0 is explicitly not supported in unbounded mode today, for the
> reason mentioned above. If FileIO doesn't reject the pipeline in that case,
> we should fix that.
>
> Reuven
>
> On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Indeed, I was wrong about the ValueProvider distinction. I updated that
>> in the JIRA.
>>
>> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
>> number. Things work fine for explicit sharding. It's the runner-provided
>> sharding mode that encounters the Flatten of PCollections with conflicting
>> triggers.
>>
>> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>>
>>> FileIO requires an explicit numShards in unbounded mode for a number of
>>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>>> need something to group on.
>>>
>>> It is extremely surprising that behavior would change between using a
>>> ValueProvider or not. The exact same codepath should be triggered
>>> regardless of whether a ValueProvider is used.
>>>
>>> Reuven
>>>
>>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>>> reading this archived) that there are known problems with continuation
>>>> triggers. A spec on continuation triggers that we missed was that they
>>>> really must be "compatible" (this is an arbitrary concept, having only to
>>>> do with Flattening two PCollections together) with their original trigger.
>>>> Without this, we also know that you can have three PCollections with
>>>> identical triggering and you can CoGroupByKey them together but you cannot
>>>> do this three-way join as a sequence of binary joins.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com>
>>>> wrote:
>>>>
>>>>> Thanks for the response, Chamikara. I filed
>>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <
>>>>> chamikara@google.com> wrote:
>>>>>
>>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>>
>>>>>> - Cham
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>>>> collections are then flattened together and they have incompatible triggers
>>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>>
>>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>>>> no exception is thrown.
>>>>>>>
>>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>>
>>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>>> different directories according to date and I really don't care about
>>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>>
>>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>>> actually writes out files periodically?
>>>>>>>>
>>>>>>>> I tried the following:
>>>>>>>>
>>>>>>>> Window.into(new GlobalWindows())
>>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>>
>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>>   .discardingFiredPanes()
>>>>>>>>
>>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>>
>>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>>
>>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>>> flatten the incompatible PCollections together.
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Reuven Lax <re...@google.com>.
Ah,

numShards = 0 is explicitly not supported in unbounded mode today, for the
reason mentioned above. If FileIO doesn't reject the pipeline in that case,
we should fix that.

Reuven

On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jk...@mozilla.com> wrote:

> Indeed, I was wrong about the ValueProvider distinction. I updated that in
> the JIRA.
>
> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
> number. Things work fine for explicit sharding. It's the runner-provided
> sharding mode that encounters the Flatten of PCollections with conflicting
> triggers.
>
> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>
>> FileIO requires an explicit numShards in unbounded mode for a number of
>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>> need something to group on.
>>
>> It is extremely surprising that behavior would change between using a
>> ValueProvider or not. The exact same codepath should be triggered
>> regardless of whether a ValueProvider is used.
>>
>> Reuven
>>
>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>> reading this archived) that there are known problems with continuation
>>> triggers. A spec on continuation triggers that we missed was that they
>>> really must be "compatible" (this is an arbitrary concept, having only to
>>> do with Flattening two PCollections together) with their original trigger.
>>> Without this, we also know that you can have three PCollections with
>>> identical triggering and you can CoGroupByKey them together but you cannot
>>> do this three-way join as a sequence of binary joins.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> Thanks for the response, Chamikara. I filed
>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>
>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>
>>>>> - Cham
>>>>>
>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>>> collections are then flattened together and they have incompatible triggers
>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>
>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>>> no exception is thrown.
>>>>>>
>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>> different directories according to date and I really don't care about
>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>
>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>> actually writes out files periodically?
>>>>>>>
>>>>>>> I tried the following:
>>>>>>>
>>>>>>> Window.into(new GlobalWindows())
>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>   .discardingFiredPanes()
>>>>>>>
>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>
>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>
>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>> flatten the incompatible PCollections together.
>>>>>>>
>>>>>>>
>>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Chamikara Jayalath <ch...@google.com>.
Actually, this is a documented known issue.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L152

On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <jk...@mozilla.com> wrote:

> Indeed, I was wrong about the ValueProvider distinction. I updated that in
> the JIRA.
>
> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
> number. Things work fine for explicit sharding. It's the runner-provided
> sharding mode that encounters the Flatten of PCollections with conflicting
> triggers.
>
> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:
>
>> FileIO requires an explicit numShards in unbounded mode for a number of
>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>> need something to group on.
>>
>> It is extremely surprising that behavior would change between using a
>> ValueProvider or not. The exact same codepath should be triggered
>> regardless of whether a ValueProvider is used.
>>
>> Reuven
>>
>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>> reading this archived) that there are known problems with continuation
>>> triggers. A spec on continuation triggers that we missed was that they
>>> really must be "compatible" (this is an arbitrary concept, having only to
>>> do with Flattening two PCollections together) with their original trigger.
>>> Without this, we also know that you can have three PCollections with
>>> identical triggering and you can CoGroupByKey them together but you cannot
>>> do this three-way join as a sequence of binary joins.
>>>
>>> Kenn
>>>
>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> Thanks for the response, Chamikara. I filed
>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>
>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
>>>> wrote:
>>>>
>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>
>>>>> - Cham
>>>>>
>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>>> collections are then flattened together and they have incompatible triggers
>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>
>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>>> no exception is thrown.
>>>>>>
>>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm building a pipeline that streams from Pubsub and writes to
>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into
>>>>>>> different directories according to date and I really don't care about
>>>>>>> ordering of elements beyond the date buckets.
>>>>>>>
>>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>>> actually writes out files periodically?
>>>>>>>
>>>>>>> I tried the following:
>>>>>>>
>>>>>>> Window.into(new GlobalWindows())
>>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>>
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>>   .discardingFiredPanes()
>>>>>>>
>>>>>>> But it raises an exception about incompatible triggers:
>>>>>>>
>>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>>
>>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>>> flatten the incompatible PCollections together.
>>>>>>>
>>>>>>>
>>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Jeff Klukas <jk...@mozilla.com>.
Indeed, I was wrong about the ValueProvider distinction. I updated that in
the JIRA.

It's when numShards is 0 (so runner-provided sharding) vs. an explicit
number. Things work fine for explicit sharding. It's the runner-provided
sharding mode that encounters the Flatten of PCollections with conflicting
triggers.

On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:

> FileIO requires an explicit numShards in unbounded mode for a number of
> reasons - one being that a trigger has to happen on a GroupByKey, and we
> need something to group on.
>
> It is extremely surprising that behavior would change between using a
> ValueProvider or not. The exact same codepath should be triggered
> regardless of whether a ValueProvider is used.
>
> Reuven
>
> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Definitely sounds like a bug but also I want to caution you (or anyone
>> reading this archived) that there are known problems with continuation
>> triggers. A spec on continuation triggers that we missed was that they
>> really must be "compatible" (this is an arbitrary concept, having only to
>> do with Flattening two PCollections together) with their original trigger.
>> Without this, we also know that you can have three PCollections with
>> identical triggering and you can CoGroupByKey them together but you cannot
>> do this three-way join as a sequence of binary joins.
>>
>> Kenn
>>
>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Thanks for the response, Chamikara. I filed
>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>> around the problem in my case by not using a ValueProvider for numShards.
>>>
>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> I'm not to familiar about the exact underlying issue here but writing
>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>
>>>> - Cham
>>>>
>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>> wrote:
>>>>
>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>> collections are then flattened together and they have incompatible triggers
>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>
>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>> no exception is thrown.
>>>>>
>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>>>> directories according to date and I really don't care about ordering of
>>>>>> elements beyond the date buckets.
>>>>>>
>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>> actually writes out files periodically?
>>>>>>
>>>>>> I tried the following:
>>>>>>
>>>>>> Window.into(new GlobalWindows())
>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>   .discardingFiredPanes()
>>>>>>
>>>>>> But it raises an exception about incompatible triggers:
>>>>>>
>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>
>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>> flatten the incompatible PCollections together.
>>>>>>
>>>>>>
>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Jeff Klukas <jk...@mozilla.com>.
Indeed, I was wrong about the ValueProvider distinction. I updated that in
the JIRA.

It's when numShards is 0 (so runner-provided sharding) vs. an explicit
number. Things work fine for explicit sharding. It's the runner-provided
sharding mode that encounters the Flatten of PCollections with conflicting
triggers.

On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <re...@google.com> wrote:

> FileIO requires an explicit numShards in unbounded mode for a number of
> reasons - one being that a trigger has to happen on a GroupByKey, and we
> need something to group on.
>
> It is extremely surprising that behavior would change between using a
> ValueProvider or not. The exact same codepath should be triggered
> regardless of whether a ValueProvider is used.
>
> Reuven
>
> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Definitely sounds like a bug but also I want to caution you (or anyone
>> reading this archived) that there are known problems with continuation
>> triggers. A spec on continuation triggers that we missed was that they
>> really must be "compatible" (this is an arbitrary concept, having only to
>> do with Flattening two PCollections together) with their original trigger.
>> Without this, we also know that you can have three PCollections with
>> identical triggering and you can CoGroupByKey them together but you cannot
>> do this three-way join as a sequence of binary joins.
>>
>> Kenn
>>
>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> Thanks for the response, Chamikara. I filed
>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>> around the problem in my case by not using a ValueProvider for numShards.
>>>
>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
>>> wrote:
>>>
>>>> I'm not to familiar about the exact underlying issue here but writing
>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>
>>>> - Cham
>>>>
>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com>
>>>> wrote:
>>>>
>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>>> collections are then flattened together and they have incompatible triggers
>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>
>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>> no exception is thrown.
>>>>>
>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>>> wrote:
>>>>>
>>>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>>>> directories according to date and I really don't care about ordering of
>>>>>> elements beyond the date buckets.
>>>>>>
>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>> actually writes out files periodically?
>>>>>>
>>>>>> I tried the following:
>>>>>>
>>>>>> Window.into(new GlobalWindows())
>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>>
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>>   .discardingFiredPanes()
>>>>>>
>>>>>> But it raises an exception about incompatible triggers:
>>>>>>
>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>
>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>> flatten the incompatible PCollections together.
>>>>>>
>>>>>>
>>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Reuven Lax <re...@google.com>.
FileIO requires an explicit numShards in unbounded mode for a number of
reasons - one being that a trigger has to happen on a GroupByKey, and we
need something to group on.

It is extremely surprising that behavior would change between using a
ValueProvider or not. The exact same codepath should be triggered
regardless of whether a ValueProvider is used.

Reuven

On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <ke...@apache.org> wrote:

> Definitely sounds like a bug but also I want to caution you (or anyone
> reading this archived) that there are known problems with continuation
> triggers. A spec on continuation triggers that we missed was that they
> really must be "compatible" (this is an arbitrary concept, having only to
> do with Flattening two PCollections together) with their original trigger.
> Without this, we also know that you can have three PCollections with
> identical triggering and you can CoGroupByKey them together but you cannot
> do this three-way join as a sequence of binary joins.
>
> Kenn
>
> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> Thanks for the response, Chamikara. I filed
>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>> around the problem in my case by not using a ValueProvider for numShards.
>>
>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
>> wrote:
>>
>>> I'm not to familiar about the exact underlying issue here but writing
>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>
>>> - Cham
>>>
>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>> first goes through one GroupByKey while the other goes through 2. These two
>>>> collections are then flattened together and they have incompatible triggers
>>>> due to the double-grouped collection using a continuation trigger.
>>>>
>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>> no exception is thrown.
>>>>
>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>> FileIO). But I'd love to hear other interpretations.
>>>>
>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com>
>>>> wrote:
>>>>
>>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>>> directories according to date and I really don't care about ordering of
>>>>> elements beyond the date buckets.
>>>>>
>>>>> So, I think GlobalWindows is appropriate in this case, even though the
>>>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>>>>> based on number of elements and/or processing time so that beam actually
>>>>> writes out files periodically?
>>>>>
>>>>> I tried the following:
>>>>>
>>>>> Window.into(new GlobalWindows())
>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>     AfterPane.elementCountAtLeast(10000),
>>>>>
>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>>   .discardingFiredPanes()
>>>>>
>>>>> But it raises an exception about incompatible triggers:
>>>>>
>>>>> Inputs to Flatten had incompatible triggers:
>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>
>>>>> I believe that what's happening is that FileIO with explicit numShards
>>>>> (required in the case of unbounded input) is forcing a GroupByKey, which
>>>>> activates continuation triggers that are incompatible with my stated
>>>>> triggers. It's internals of WriteFiles that's trying to flatten the
>>>>> incompatible PCollections together.
>>>>>
>>>>>
>>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Kenneth Knowles <ke...@apache.org>.
Definitely sounds like a bug but also I want to caution you (or anyone
reading this archived) that there are known problems with continuation
triggers. A spec on continuation triggers that we missed was that they
really must be "compatible" (this is an arbitrary concept, having only to
do with Flattening two PCollections together) with their original trigger.
Without this, we also know that you can have three PCollections with
identical triggering and you can CoGroupByKey them together but you cannot
do this three-way join as a sequence of binary joins.

Kenn

On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:

> Thanks for the response, Chamikara. I filed
> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
> around the problem in my case by not using a ValueProvider for numShards.
>
> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> I'm not to familiar about the exact underlying issue here but writing
>> unbounded input to files when using GlobalWindows for unsharded output is a
>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>
>> - Cham
>>
>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> I've read more deeply into the WriteFiles code and I'm understanding now
>>> that the exception is due to WriteFiles' attempt to handle unsharded input.
>>> In that case, it creates a sharded and unsharded collection; the first goes
>>> through one GroupByKey while the other goes through 2. These two
>>> collections are then flattened together and they have incompatible triggers
>>> due to the double-grouped collection using a continuation trigger.
>>>
>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>> switch to hard coding an integer rather than passing a ValueProvider,
>>> WriteFiles uses a different code path that doesn't flatten collections and
>>> no exception is thrown.
>>>
>>> So, this might really be considered a bug of WriteFiles (and thus
>>> FileIO). But I'd love to hear other interpretations.
>>>
>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>> directories according to date and I really don't care about ordering of
>>>> elements beyond the date buckets.
>>>>
>>>> So, I think GlobalWindows is appropriate in this case, even though the
>>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>>>> based on number of elements and/or processing time so that beam actually
>>>> writes out files periodically?
>>>>
>>>> I tried the following:
>>>>
>>>> Window.into(new GlobalWindows())
>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>     AfterPane.elementCountAtLeast(10000),
>>>>
>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>   .discardingFiredPanes()
>>>>
>>>> But it raises an exception about incompatible triggers:
>>>>
>>>> Inputs to Flatten had incompatible triggers:
>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>
>>>> I believe that what's happening is that FileIO with explicit numShards
>>>> (required in the case of unbounded input) is forcing a GroupByKey, which
>>>> activates continuation triggers that are incompatible with my stated
>>>> triggers. It's internals of WriteFiles that's trying to flatten the
>>>> incompatible PCollections together.
>>>>
>>>>
>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Kenneth Knowles <ke...@apache.org>.
Definitely sounds like a bug but also I want to caution you (or anyone
reading this archived) that there are known problems with continuation
triggers. A spec on continuation triggers that we missed was that they
really must be "compatible" (this is an arbitrary concept, having only to
do with Flattening two PCollections together) with their original trigger.
Without this, we also know that you can have three PCollections with
identical triggering and you can CoGroupByKey them together but you cannot
do this three-way join as a sequence of binary joins.

Kenn

On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <jk...@mozilla.com> wrote:

> Thanks for the response, Chamikara. I filed
> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
> around the problem in my case by not using a ValueProvider for numShards.
>
> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>> I'm not to familiar about the exact underlying issue here but writing
>> unbounded input to files when using GlobalWindows for unsharded output is a
>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>
>> - Cham
>>
>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> I've read more deeply into the WriteFiles code and I'm understanding now
>>> that the exception is due to WriteFiles' attempt to handle unsharded input.
>>> In that case, it creates a sharded and unsharded collection; the first goes
>>> through one GroupByKey while the other goes through 2. These two
>>> collections are then flattened together and they have incompatible triggers
>>> due to the double-grouped collection using a continuation trigger.
>>>
>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>>> switch to hard coding an integer rather than passing a ValueProvider,
>>> WriteFiles uses a different code path that doesn't flatten collections and
>>> no exception is thrown.
>>>
>>> So, this might really be considered a bug of WriteFiles (and thus
>>> FileIO). But I'd love to hear other interpretations.
>>>
>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>>
>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>> directories according to date and I really don't care about ordering of
>>>> elements beyond the date buckets.
>>>>
>>>> So, I think GlobalWindows is appropriate in this case, even though the
>>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>>>> based on number of elements and/or processing time so that beam actually
>>>> writes out files periodically?
>>>>
>>>> I tried the following:
>>>>
>>>> Window.into(new GlobalWindows())
>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>     AfterPane.elementCountAtLeast(10000),
>>>>
>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>>   .discardingFiredPanes()
>>>>
>>>> But it raises an exception about incompatible triggers:
>>>>
>>>> Inputs to Flatten had incompatible triggers:
>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>
>>>> I believe that what's happening is that FileIO with explicit numShards
>>>> (required in the case of unbounded input) is forcing a GroupByKey, which
>>>> activates continuation triggers that are incompatible with my stated
>>>> triggers. It's internals of WriteFiles that's trying to flatten the
>>>> incompatible PCollections together.
>>>>
>>>>
>>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Jeff Klukas <jk...@mozilla.com>.
Thanks for the response, Chamikara. I filed
https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
around the problem in my case by not using a ValueProvider for numShards.

On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <ch...@google.com>
wrote:

> I'm not to familiar about the exact underlying issue here but writing
> unbounded input to files when using GlobalWindows for unsharded output is a
> valid usecase so sounds like a bug. Feel free to create a JIRA.
>
> - Cham
>
> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> I've read more deeply into the WriteFiles code and I'm understanding now
>> that the exception is due to WriteFiles' attempt to handle unsharded input.
>> In that case, it creates a sharded and unsharded collection; the first goes
>> through one GroupByKey while the other goes through 2. These two
>> collections are then flattened together and they have incompatible triggers
>> due to the double-grouped collection using a continuation trigger.
>>
>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
>> switch to hard coding an integer rather than passing a ValueProvider,
>> WriteFiles uses a different code path that doesn't flatten collections and
>> no exception is thrown.
>>
>> So, this might really be considered a bug of WriteFiles (and thus
>> FileIO). But I'd love to hear other interpretations.
>>
>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com> wrote:
>>
>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>> I'm using FileIO's dynamic destinations to place elements into different
>>> directories according to date and I really don't care about ordering of
>>> elements beyond the date buckets.
>>>
>>> So, I think GlobalWindows is appropriate in this case, even though the
>>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>>> based on number of elements and/or processing time so that beam actually
>>> writes out files periodically?
>>>
>>> I tried the following:
>>>
>>> Window.into(new GlobalWindows())
>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>     AfterPane.elementCountAtLeast(10000),
>>>
>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>>   .discardingFiredPanes()
>>>
>>> But it raises an exception about incompatible triggers:
>>>
>>> Inputs to Flatten had incompatible triggers:
>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>
>>> I believe that what's happening is that FileIO with explicit numShards
>>> (required in the case of unbounded input) is forcing a GroupByKey, which
>>> activates continuation triggers that are incompatible with my stated
>>> triggers. It's internals of WriteFiles that's trying to flatten the
>>> incompatible PCollections together.
>>>
>>>
>>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Chamikara Jayalath <ch...@google.com>.
I'm not to familiar about the exact underlying issue here but writing
unbounded input to files when using GlobalWindows for unsharded output is a
valid usecase so sounds like a bug. Feel free to create a JIRA.

- Cham

On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <jk...@mozilla.com> wrote:

> I've read more deeply into the WriteFiles code and I'm understanding now
> that the exception is due to WriteFiles' attempt to handle unsharded input.
> In that case, it creates a sharded and unsharded collection; the first goes
> through one GroupByKey while the other goes through 2. These two
> collections are then flattened together and they have incompatible triggers
> due to the double-grouped collection using a continuation trigger.
>
> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I
> switch to hard coding an integer rather than passing a ValueProvider,
> WriteFiles uses a different code path that doesn't flatten collections and
> no exception is thrown.
>
> So, this might really be considered a bug of WriteFiles (and thus FileIO).
> But I'd love to hear other interpretations.
>
> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com> wrote:
>
>> I'm building a pipeline that streams from Pubsub and writes to files. I'm
>> using FileIO's dynamic destinations to place elements into different
>> directories according to date and I really don't care about ordering of
>> elements beyond the date buckets.
>>
>> So, I think GlobalWindows is appropriate in this case, even though the
>> input is unbounded. Is it possible to use GlobalWindows but set a trigger
>> based on number of elements and/or processing time so that beam actually
>> writes out files periodically?
>>
>> I tried the following:
>>
>> Window.into(new GlobalWindows())
>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>     AfterPane.elementCountAtLeast(10000),
>>
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>>   .discardingFiredPanes()
>>
>> But it raises an exception about incompatible triggers:
>>
>> Inputs to Flatten had incompatible triggers:
>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>
>> I believe that what's happening is that FileIO with explicit numShards
>> (required in the case of unbounded input) is forcing a GroupByKey, which
>> activates continuation triggers that are incompatible with my stated
>> triggers. It's internals of WriteFiles that's trying to flatten the
>> incompatible PCollections together.
>>
>>
>>

Re: Possible to use GlobalWindows for writing unbounded input to files?

Posted by Jeff Klukas <jk...@mozilla.com>.
I've read more deeply into the WriteFiles code and I'm understanding now
that the exception is due to WriteFiles' attempt to handle unsharded input.
In that case, it creates a sharded and unsharded collection; the first goes
through one GroupByKey while the other goes through 2. These two
collections are then flattened together and they have incompatible triggers
due to the double-grouped collection using a continuation trigger.

I was calling FileIO.withNumShards(ValueProvider<Integer>), but if I switch
to hard coding an integer rather than passing a ValueProvider, WriteFiles
uses a different code path that doesn't flatten collections and no
exception is thrown.

So, this might really be considered a bug of WriteFiles (and thus FileIO).
But I'd love to hear other interpretations.

On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <jk...@mozilla.com> wrote:

> I'm building a pipeline that streams from Pubsub and writes to files. I'm
> using FileIO's dynamic destinations to place elements into different
> directories according to date and I really don't care about ordering of
> elements beyond the date buckets.
>
> So, I think GlobalWindows is appropriate in this case, even though the
> input is unbounded. Is it possible to use GlobalWindows but set a trigger
> based on number of elements and/or processing time so that beam actually
> writes out files periodically?
>
> I tried the following:
>
> Window.into(new GlobalWindows())
>   .triggering(Repeatedly.forever(AfterFirst.of(
>     AfterPane.elementCountAtLeast(10000),
>
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)))))
>   .discardingFiredPanes()
>
> But it raises an exception about incompatible triggers:
>
> Inputs to Flatten had incompatible triggers:
> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>
> I believe that what's happening is that FileIO with explicit numShards
> (required in the case of unbounded input) is forcing a GroupByKey, which
> activates continuation triggers that are incompatible with my stated
> triggers. It's internals of WriteFiles that's trying to flatten the
> incompatible PCollections together.
>
>
>