You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Chamikara Jayalath (JIRA)" <ji...@apache.org> on 2019/01/09 18:46:00 UTC
[jira] [Assigned] (BEAM-6399) FileIO errors on unbounded input with
nondefault trigger
[ https://issues.apache.org/jira/browse/BEAM-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chamikara Jayalath reassigned BEAM-6399:
----------------------------------------
Assignee: (was: Eugene Kirpichov)
> FileIO errors on unbounded input with nondefault trigger
> --------------------------------------------------------
>
> Key: BEAM-6399
> URL: https://issues.apache.org/jira/browse/BEAM-6399
> Project: Beam
> Issue Type: Improvement
> Components: io-java-files
> Reporter: Jeff Klukas
> Priority: Major
>
> In a pipeline with unbounded input, if a user defines a custom trigger and calls FileIO.withNumShards(ValueProvider<Integer>), they may see an IllegalArgumentException at runtime due to incompatible windows.
>
> For example, consider this compound trigger:
>
> {{Window.into(new GlobalWindows())}}
> {{ .triggering(Repeatedly.forever(AfterFirst.of(}}{{ }}
> {{ AfterPane.elementCountAtLeast(10000), }}{{ }}
> {{ AfterProcessingTime.pastFirstElementInPane()}}
> {{ .plusDelayOf(Duration.standardMinutes(10)))))}}{{ .discardingFiredPanes()}}
>
> Using that windowing with a numShards ValueProvider yields:
>
> {{Inputs to Flatten had incompatible triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), AfterSynchronizedProcessingTime.pastFirstElementInPane()))}}
>
> In the case of ValueProvider for numShards, WriteFiles creates both a sharded and unsharded collection; the first goes through one GroupByKey while the other goes through 2. These two collections are then flattened together and they have incompatible triggers due to the double-grouped collection using a continuation trigger.
>
> If the user instead specifies a positive non-ValueProvider numShards, then a different code path is followed that avoids this incompatibility.
>
> It looks like WriteFiles may need to be implemented differently to avoid combining collections with potentially incompatible triggers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)