You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2019/01/17 17:33:00 UTC

[jira] [Assigned] (BEAM-6407) regression: FileIO.writeDynamic() with side inputs fails in DirectRunner

     [ https://issues.apache.org/jira/browse/BEAM-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles reassigned BEAM-6407:
-------------------------------------

    Assignee: Niel Markwick  (was: Kenneth Knowles)

> regression: FileIO.writeDynamic() with side inputs fails in DirectRunner
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6407
>                 URL: https://issues.apache.org/jira/browse/BEAM-6407
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.9.0
>            Reporter: Niel Markwick
>            Assignee: Niel Markwick
>            Priority: Blocker
>              Labels: regression
>             Fix For: 2.10.0
>
>         Attachments: beam-filewriter-demo.tgz
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> When FileIO.writeDynamic is used with automatic sharding and  a Contextful.Fn that uses side inputs for the file naming, DirectRunner (and TestPipeline) fail with: 
> {{java.lang.IllegalStateException: All PCollectionViews that are consumed must be written by some WriteView PTransform: Missing [<unnamed> [RunnerPCollectionView]]}}
>  
> Example code:  
> {code:java}
> PCollectionView<String> outputFileName =
>    pipeline.apply(
>       "outputDir",
>        Create.of("/tmp/testout")).apply(View.asSingleton());
> Contextful.Fn<String, FileIO.Write.FileNaming> manifestNaming =
>    (element, c) ->
>       (window, pane, numShards, shardIndex, compression) -> 
>          c.sideInput(outputFileName)+shardIndex;
> pipeline.apply(FileIO.<String, String>writeDynamic()
>    .by(SerializableFunctions.constant(""))
>    .withDestinationCoder(StringUtf8Coder.of())
>    .via(TextIO.sink())
>    .withTempDirectory("/tmp")
>    .withNaming(Contextful.of(
>       manifestNaming,
>       Requirements.requiresSideInputs(outputFileName))));
> {code}
>  
> This does not occur in Dataflow-runner
> It does not occur if the ContextFul.Fn is not given side inputs.
> It does not occur if withNumShards(1) is set.
> It did not occur in 2.8.0, and does in 2.9.0 and 2.10.0-SNAPSHOT (as of today)
>  
> The cause appears to be due to the DirectRunner using TransformOverrides re-writing FileIO sinks to use runner-determined-sharding
> ( see [DirectRunner.java line 226|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java#L226] )
>  but I do not know why this started occuring in 2.9.0...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)