You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Oliver Laslett <ol...@cytora.com> on 2019/08/14 14:18:43 UTC

Python FileBasedSource supporting gzip

What is the correct way to implement a custom non-splittable file parser in
python?

My desired end-state is: 1) use Read to pass a file pattern (with wild
cards) pointing to several XML files on remote storage (S3 or GCS). 2) each
file is parsed as a single element (XML cannot be processed line-by-line)
resulting in a PCollection. 3) combine all PCollections together.

I've subclassed FileBasedSource, which seems to give me everything out of
the box. However I have a problem with zipped files.
The self.open_file(fname) method returns a file object. For non-compressed
files I can call self.open_file(fname).read(). But for compressed files I
have a missing argument error and must provide the number of bytes to read:
self.open_file(fname).read(num_bytes).

Is it possible to implement a FileBasedSource that works generically for
compressed and non-compressed non-splittable files?

Re: Python FileBasedSource supporting gzip

Posted by Chamikara Jayalath <ch...@google.com>.
On Sun, Aug 18, 2019 at 10:45 AM Oliver Laslett <ol...@cytora.com> wrote:

> Hi Cham,
>
> That's really helpful thank you. I think fileio.MatchAll is basically what
> I needed FileBasedSource for.
>
> What did you mean by "use Beam's filesystems abstraction in your ParDo"?
>

If you want to connect to all file-systems supported by Beam (currently
GCS, HDFS, and local) in an abstract way, you can use the filesystems API:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py

Thanks,
Cham


>
> Cheers
> Oliver
>
> On Fri, Aug 16, 2019 at 2:32 AM Chamikara Jayalath <ch...@google.com>
> wrote:
>
>>
>>
>> On Wed, Aug 14, 2019 at 7:19 AM Oliver Laslett <ol...@cytora.com> wrote:
>>
>>> What is the correct way to implement a custom non-splittable file parser
>>> in python?
>>>
>>> My desired end-state is: 1) use Read to pass a file pattern (with wild
>>> cards) pointing to several XML files on remote storage (S3 or GCS). 2) each
>>> file is parsed as a single element (XML cannot be processed line-by-line)
>>> resulting in a PCollection. 3) combine all PCollections together.
>>>
>>> I've subclassed FileBasedSource, which seems to give me everything out
>>> of the box. However I have a problem with zipped files.
>>> The self.open_file(fname) method returns a file object. For
>>> non-compressed files I can call self.open_file(fname).read(). But for
>>> compressed files I have a missing argument error and must provide the
>>> number of bytes to read: self.open_file(fname).read(num_bytes).
>>>
>>> Is it possible to implement a FileBasedSource that works generically for
>>> compressed and non-compressed non-splittable files?
>>>
>>
>> It should be possible. I'm not sure what your issue was
>> though. self.open_file() should return a file-like object (a CompressedFile
>> object if you specified a compression type). In your read_records()
>> implementation, you are expected to read bytes from this file (not all
>> bytes have to be read in a single call) and produce an iterator for reading
>> records.
>>
>> If your files are non splittable though, FileBasedSource does not add
>> much value. I suggest also looking into fileio.MatchAll  transform and
>> implementing your source as a composite that uses fileio.MatchAll followed
>> by a ParDo that produces records. You can use Beam's filesystems
>> abstraction in your ParDo to get easy access to all filesystems supported
>> by Beam.
>>
>> Thanks,
>> Cham
>>
>>
>
>
> --
>
> Oliver Laslett
>
> Machine Learning Scientist | Cytora
>
> We're hiring! <http://www.cytora.com/careers>
>
> W: www.cytora.com
>
> 9 Dallington Street | London, EC1V 0LN
>
> This email is confidential and intended for the use of the addressee only.
> If you receive this email in error, please accept our apology and delete it
> immediately. Please inform us if you have received this email in error.
>

Re: Python FileBasedSource supporting gzip

Posted by Oliver Laslett <ol...@cytora.com>.
Hi Cham,

That's really helpful thank you. I think fileio.MatchAll is basically what
I needed FileBasedSource for.

What did you mean by "use Beam's filesystems abstraction in your ParDo"?

Cheers
Oliver

On Fri, Aug 16, 2019 at 2:32 AM Chamikara Jayalath <ch...@google.com>
wrote:

>
>
> On Wed, Aug 14, 2019 at 7:19 AM Oliver Laslett <ol...@cytora.com> wrote:
>
>> What is the correct way to implement a custom non-splittable file parser
>> in python?
>>
>> My desired end-state is: 1) use Read to pass a file pattern (with wild
>> cards) pointing to several XML files on remote storage (S3 or GCS). 2) each
>> file is parsed as a single element (XML cannot be processed line-by-line)
>> resulting in a PCollection. 3) combine all PCollections together.
>>
>> I've subclassed FileBasedSource, which seems to give me everything out of
>> the box. However I have a problem with zipped files.
>> The self.open_file(fname) method returns a file object. For
>> non-compressed files I can call self.open_file(fname).read(). But for
>> compressed files I have a missing argument error and must provide the
>> number of bytes to read: self.open_file(fname).read(num_bytes).
>>
>> Is it possible to implement a FileBasedSource that works generically for
>> compressed and non-compressed non-splittable files?
>>
>
> It should be possible. I'm not sure what your issue was
> though. self.open_file() should return a file-like object (a CompressedFile
> object if you specified a compression type). In your read_records()
> implementation, you are expected to read bytes from this file (not all
> bytes have to be read in a single call) and produce an iterator for reading
> records.
>
> If your files are non splittable though, FileBasedSource does not add much
> value. I suggest also looking into fileio.MatchAll  transform and
> implementing your source as a composite that uses fileio.MatchAll followed
> by a ParDo that produces records. You can use Beam's filesystems
> abstraction in your ParDo to get easy access to all filesystems supported
> by Beam.
>
> Thanks,
> Cham
>
>


-- 

Oliver Laslett

Machine Learning Scientist | Cytora

We're hiring! <http://www.cytora.com/careers>

W: www.cytora.com

9 Dallington Street | London, EC1V 0LN

This email is confidential and intended for the use of the addressee only.
If you receive this email in error, please accept our apology and delete it
immediately. Please inform us if you have received this email in error.

Re: Python FileBasedSource supporting gzip

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, Aug 14, 2019 at 7:19 AM Oliver Laslett <ol...@cytora.com> wrote:

> What is the correct way to implement a custom non-splittable file parser
> in python?
>
> My desired end-state is: 1) use Read to pass a file pattern (with wild
> cards) pointing to several XML files on remote storage (S3 or GCS). 2) each
> file is parsed as a single element (XML cannot be processed line-by-line)
> resulting in a PCollection. 3) combine all PCollections together.
>
> I've subclassed FileBasedSource, which seems to give me everything out of
> the box. However I have a problem with zipped files.
> The self.open_file(fname) method returns a file object. For non-compressed
> files I can call self.open_file(fname).read(). But for compressed files I
> have a missing argument error and must provide the number of bytes to read:
> self.open_file(fname).read(num_bytes).
>
> Is it possible to implement a FileBasedSource that works generically for
> compressed and non-compressed non-splittable files?
>

It should be possible. I'm not sure what your issue was
though. self.open_file() should return a file-like object (a CompressedFile
object if you specified a compression type). In your read_records()
implementation, you are expected to read bytes from this file (not all
bytes have to be read in a single call) and produce an iterator for reading
records.

If your files are non splittable though, FileBasedSource does not add much
value. I suggest also looking into fileio.MatchAll  transform and
implementing your source as a composite that uses fileio.MatchAll followed
by a ParDo that produces records. You can use Beam's filesystems
abstraction in your ParDo to get easy access to all filesystems supported
by Beam.

Thanks,
Cham