You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Luke Cwik <lc...@google.com> on 2021/06/14 23:02:23 UTC

Re: How avoid blocking when decompressing large GZIP files.

Try adding a Reshuffle transform to the pipeline after the ParDo that gives
the sequence number. This will cause the data to be materialized and then
the subsequent steps happen in parallel.

Depending on which IO transform you are using and if splitting support is
ever added for compressed files and splitting support is added for CSV
files then your pipeline might be broken since "multiple" file segments
will start from 1 and count up.

There is some advanced support for having line numbers along with the data
within ContextualTextIO which might be of interest to you as a replacement
for your implementation.


On Fri, Apr 23, 2021 at 5:10 AM Evan Galpin <ev...@gmail.com> wrote:

> Hmm in my somewhat limited experience, I was not able to combine state and
> Splittable DoFn. Definitely could be user error on my part though.
>
> RE sequence numbers, could it work to embed those numbers in the CSV
> itself?
>
> Thanks,
> Evan
>
> On Fri, Apr 23, 2021 at 07:55 Simon Gauld <si...@gmail.com> wrote:
>
>> Thank you and I will have a look however some concerns I have
>>
>> - the gzip itself is not splittable as such
>> - I need to apply a sequence number 1..n so I believe the read *must* be
>> sequential
>>
>> However what I am looking to achieve is handing off the newly decorated
>> row as soon as the sequence is applied to it.   The issue is that the
>> entire step of applying the sequence number appear to be blocking. Also of
>> note, I am using a @DoFn.StateId.
>>
>> I'll look at SplittableDoFns, thanks.
>>
>>
>> On Fri, Apr 23, 2021 at 12:50 PM Evan Galpin <ev...@gmail.com>
>> wrote:
>>
>>> I could be wrong but I believe that if your large file is being read by
>>> a DoFn, it’s likely that the file is being processed atomically inside that
>>> DoFn, which cannot be parallelized further by the runner.
>>>
>>> One purpose-built way around that constraint is by using Splittable
>>> DoFn[1][2] which could be used to allow each split to read a portion of the
>>> file. I don’t know, however, how this might (or might not) work with
>>> compression.
>>>
>>> [1]
>>> https://beam.apache.org/blog/splittable-do-fn-is-available/
>>> [2]
>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Fri, Apr 23, 2021 at 07:34 Simon Gauld <si...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am trying to apply a transformation to each row in a reasonably large
>>>> (1b row) gzip compressed CSV.
>>>>
>>>> The first operation is to assign a sequence number, in this case 1,2,3..
>>>>
>>>> The second operation is the actual transformation.
>>>>
>>>> I would like to apply the sequence number *as* each row is read from
>>>> the compressed source and then hand off the 'real' transformation work in
>>>> parallel, using DataFlow to autoscale the workers for the transformation.
>>>>
>>>> I don't seem to be able to scale *until* all rows have been read; this
>>>> appears to be blocking the pipeline until decompression of the entire file
>>>> is completed.   At this point DataFlow autoscaling works as expected, it
>>>> scales upwards and throughput is then high. The issue is the decompression
>>>> appears to block.
>>>>
>>>> My question: in beam, is it possible to stream records from a
>>>> compressed source? without blocking the pipeline?
>>>>
>>>> thank you
>>>>
>>>> .s
>>>>
>>>>