You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2020/10/01 18:03:00 UTC

[jira] [Updated] (BEAM-10998) Write just one file per window with WriteToFiles transform

     [ https://issues.apache.org/jira/browse/BEAM-10998?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10998:
-----------------------------------
    Fix Version/s:     (was: 2.24.0)

> Write just one file per window with WriteToFiles transform
> ----------------------------------------------------------
>
>                 Key: BEAM-10998
>                 URL: https://issues.apache.org/jira/browse/BEAM-10998
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-files, sdk-py-core
>    Affects Versions: 2.24.0
>            Reporter: Andrey
>            Priority: P1
>
>  In this case all message from Pub/Sub topic need accumulate in one text file per window, however  WriteToFiles produce many file instead one
> {code:python}
> input = (p
>                  | 'ReadData' >> beam.io.ReadFromPubSub(topic=known_args.input_topic).with_output_types(bytes)
>                  | "Decode" >> beam.Map(lambda x: x.decode('utf-8'))
>                  | 'Parse' >> beam.Map(parse_json)
>                  | 'Data w' >> beam.WindowInto(
>                     FixedWindows(60),
>                     trigger=trigger.AfterWatermark(),
>                     accumulation_mode=AccumulationMode.DISCARDING
>                 )
>                  | 'Group elements into windows' >> beam.Reshuffle()
>                  )
>         event_data = (input
>              | 'Filter events' >> beam.Filter(lambda x: x['t'] == 'event')
>              | 'Encode ' >> beam.Map(lambda x: json.dumps(x))
>              | 'Write to file's' >> fileio.WriteToFiles(
>                     path='gs://some/gcs/bucket/',
>                     file_naming=fileio.default_file_naming(
>                         prefix='events',
>                         suffix='.txt'
>                     ),
>                     shards=1
>                 )
>         )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)