You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Guillermo Rodríguez Cano (JIRA)" <ji...@apache.org> on 2017/06/22 23:18:00 UTC

[jira] [Comment Edited] (BEAM-2490) ReadFromText function is not taking all data with glob operator (*)

    [ https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060177#comment-16060177 ] 

Guillermo Rodríguez Cano edited comment on BEAM-2490 at 6/22/17 11:17 PM:
--------------------------------------------------------------------------

I am having a similar issue if not the same, although I am using the DirectRunner instead (but I believe that in a previous trial I was also using DataflowRunner and even gzip files).

In the following pipeline (I am trying to work with sessions):

{{
    with beam.Pipeline(options=pipeline_options) as p:
        raw_events = p | 'Read input' >> ReadFromText(known_args.input)

        sessions = (raw_events
            | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTimestampDoFn())
            | 'Compute sessions window' >> WindowInto(Sessions(gap_size=known_args.session_idle_gap))
            | 'Group by key' >> beam.GroupByKey()
            )

        output = sessions | 'Format output' >> beam.ParDo(FormatSessionOutputDoFn())
        output | 'Write results' >> WriteToText(known_args.output)
}}

where the input is a list of uncompressed JSON files in the same directory, I get the same output whether I use the glob operator (*) for all the files or I set the first file in that list.

When running the pipeline like this (for files of about 200M size each):
{{python sessions_manager.py --input ./input/test/* --output ./output/ --runner DirectRunner}}

The following shows up as process:
{{python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab.json ./input/test/xac.json ./input/test/xad.json ./input/test/xae.json ./input/test/xaf.json ./input/test/xag --output ./output/ --runner DirectRunner}}

And if I run just this: 
{{python sessions_manager.py --input ./input/test/xaa.json --output ./output/ --runner DirectRunner}}
The output is precisely the same as the one with the glob operator (and quite different if I merge the files into one and run again the pipeline with the merged files into one).



was (Author: wileeam):
I am having a similar issue if not the same, although I am using the DirectRunner instead (but I believe that in a previous trial I was also using DataflowRunner and even gzip files).

In the following pipeline (I am trying to work with sessions):

{{    with beam.Pipeline(options=pipeline_options) as p:
        raw_events = p | 'Read input' >> ReadFromText(known_args.input)

        sessions = (raw_events
            | 'Extract event and timestamp' >> beam.ParDo(ExtractEventAndTimestampDoFn())
            | 'Compute sessions window' >> WindowInto(Sessions(gap_size=known_args.session_idle_gap))
            | 'Group by key' >> beam.GroupByKey()
            )

        output = sessions | 'Format output' >> beam.ParDo(FormatSessionOutputDoFn())
        output | 'Write results' >> WriteToText(known_args.output)}}

where the input is a list of uncompressed JSON files in the same directory, I get the same output whether I use the glob operator (*) for all the files or I set the first file in that list.

When running the pipeline like this (for files of about 200M size each):
{{python sessions_manager.py --input ./input/test/* --output ./output/ --runner DirectRunner}}

The following shows up as process:
{{python sessions_manager.py --input ./input/test/xaa.json ./input/test/xab.json ./input/test/xac.json ./input/test/xad.json ./input/test/xae.json ./input/test/xaf.json ./input/test/xag --output ./output/ --runner DirectRunner}}

And if I run just this: 
{{python sessions_manager.py --input ./input/test/xaa.json --output ./output/ --runner DirectRunner}}
The output is precisely the same as the one with the glob operator (and quite different if I merge the files into one and run again the pipeline with the merged files into one).


> ReadFromText function is not taking all data with glob operator (*) 
> --------------------------------------------------------------------
>
>                 Key: BEAM-2490
>                 URL: https://issues.apache.org/jira/browse/BEAM-2490
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>    Affects Versions: 2.0.0
>         Environment: Usage with Google Cloud Platform: Dataflow runner
>            Reporter: Olivier NGUYEN QUOC
>            Assignee: Chamikara Jayalath
>             Fix For: 2.1.0
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
>       "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>       skip_header_lines=1,
>       compression_type=beam.io.filesystem.CompressionTypes.GZIP
>       )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline (instead of a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
>           "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>           skip_header_lines=1,
>           compression_type=beam.io.filesystem.CompressionTypes.GZIP
>           )
>                        | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
>                     )
> output = (
>           data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', num_shards=1)
>             )
> {code}
> Dataflow indicates me that the estimated size 	of the output after the ReadFromText step is 602.29 MB only, which not correspond to any unique input file size nor the overall file size matching with the pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)