You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Allie Chen <yi...@google.com> on 2019/05/10 20:03:09 UTC

Problem with gzip

Hi,


I am trying to load a gzip file to BigQuey using Dataflow. Since the
compressed file is not splittable, one worker is allocated to read the
file. The same worker will do all the other transforms since Dataflow fused
all transforms together.  There are a large amount of data in the file, and
I expect to see more workers spinning up after reading transforms. I tried
to use Reshuffle Transform
<https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
to prevent the fusion, but it is not scalable since it won’t proceed until
all data arrived at this point.

Is there any other ways to allow more workers working on all the other
transforms after reading?

Thanks,

Allie

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
For hadoop, one doesn't have the option of a reshard without introducing a
whole new Map+Shufle+Reduce. In terms of cost, let's call these "reread"
vs. "reshuffle" for an input of size S.

Reread: Reads the first 1/N of the data set N times, the second 1/N N-1
times, ... for a total of (N+1)/2 * S bytes read.
Reshuffle: Reads the entire data set twice, and writes it once for a total
of 2*S bytes read and S bytes written. (One also pays for the intermediate
storage that is saved above.)

On Wed, May 15, 2019 at 7:35 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Probably include "--experiment=use_fastavro" (along with Robert's
> suggestion) which will make Dataflow use fastavro library for intermediate
> files (materialized in a fusion break).
>
> Also, have you considered storing your input directly as Avro files with
> block compression ? Avro files should perform much better since we can
> split at block boundaries.
>
> Regarding splittablegzip, usually we have avoided supporting splitting by
> reading and discarding data till split points since this does not save read
> time (and later split points have to waste more read data) but it's
> interesting indeed if users find this useful in real world scenarios.
>
> Thanks,
> Cham
>
>
> On Wed, May 15, 2019 at 8:49 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Niels, it is interesting to see that in the Hadoop community that reading
>> a gzip file through from the beginning to the split point is worth it for
>> some real user scenarios.
>>
>> [image: image.png]
>>
>>
>> *From: *Robert Bradshaw <ro...@google.com>
>> *Date: *Wed, May 15, 2019 at 5:52 AM
>> *To: * <us...@beam.apache.org>
>> *Cc: *dev
>>
>> Interesting thread. Thanks for digging that up.
>>>
>>> I would try the shuffle_mode=service experiment (forgot that wasn't
>>> yet the default). If that doesn't do the trick, though avro as a
>>> materialization format does not provide perfect parallelism, it should
>>> be significantly better than what you have now (large gzip files) and
>>> may be good enough.
>>>
>>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>>> wrote:
>>> >
>>> > @Robert
>>> >
>>> > Does your suggestion imply, that the points made by Eugene on
>>> BEAM-2803 do not apply (anymore) and the combined reshuffle could just be
>>> omitted?
>>> >
>>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Unfortunately the "write" portion of the reshuffle cannot be
>>> >> parallelized more than the source that it's reading from. In my
>>> >> experience, generally the read is the bottleneck in this case, but
>>> >> it's possible (e.g. if the input compresses extremely well) that it is
>>> >> the write that is slow (which you seem to indicate based on your
>>> >> observation of the UI, right?).
>>> >>
>>> >> It could be that materializing to temporary files is cheaper than
>>> >> materializing randomly to shuffle (especially on pre-portable Python).
>>> >> In that case you could force a fusion break with a side input instead.
>>> >> E.g.
>>> >>
>>> >> class FusionBreak(beam.PTransform):
>>> >>     def expand(self, pcoll):
>>> >>         # Create an empty PCollection that depends on pcoll.
>>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>>> >>         # Use this empty PCollection as a side input, which will force
>>> >> a fusion break.
>>> >>         return pcoll | beam.Map(lambda x, unused: x,
>>> >> beam.pvalue.AsIterable(empty))
>>> >>
>>> >> which could be used in place of Reshard like
>>> >>
>>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>> >>
>>> >> You'll probably want to be sure to pass the use_fastavro experiment
>>> as well.
>>> >>
>>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>>> >> >
>>> >> > Hi
>>> >> >
>>> >> > This project is a completely different solution towards this
>>> problem, but in the hadoop mapreduce context.
>>> >> >
>>> >> > https://github.com/nielsbasjes/splittablegzip
>>> >> >
>>> >> >
>>> >> > I have used this a lot in the past.
>>> >> > Perhaps porting this project to beam is an option?
>>> >> >
>>> >> > Niels Basjes
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>> >> >>
>>> >> >> Sorry I couldn't be more helpful.
>>> >> >>
>>> >> >> From: Allie Chen <yi...@google.com>
>>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>>> >> >> To: <de...@beam.apache.org>
>>> >> >> Cc: user
>>> >> >>
>>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>>> option for us.
>>> >> >>>
>>> >> >>>
>>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>>> data. Here are two ways I have tried:
>>> >> >>>
>>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>>> (since it is bounded source), then apply windowing before Reshuffle, but it
>>> still waits all data.
>>> >> >>>
>>> >> >>>
>>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>>> error: Workflow failed. Causes: Expected custom source to have non-zero
>>> number of splits. Also, I found in
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>>> :
>>> >> >>>
>>> >> >>> DataflowRunner does not currently support the following Cloud
>>> Dataflow specific features with Python streaming execution.
>>> >> >>>
>>> >> >>> Streaming autoscaling
>>> >> >>>
>>> >> >>> I doubt whether this approach can solve my issue.
>>> >> >>>
>>> >> >>>
>>> >> >>> Thanks so much!
>>> >> >>>
>>> >> >>> Allie
>>> >> >>>
>>> >> >>>
>>> >> >>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>>> >> >>> To: dev
>>> >> >>> Cc: user
>>> >> >>>
>>> >> >>>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> >> >>>> If not, you could structure your pipeline
>>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> >> >>>> and then run it as a batch pipeline.
>>> >> >>>>
>>> >> >>>> You can set --streaming=true on the pipeline and then it will
>>> run in a streaming mode but streaming prioritizes low latency and
>>> correctness on Google Cloud Dataflow so it will cost more to run your
>>> pipeline then in batch mode. It may make more sense to store the data
>>> uncompressed as it may be less expensive then paying the additional compute
>>> cost for streaming.
>>> >> >>>>
>>> >> >>>> From: Allie Chen <yi...@google.com>
>>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>>> >> >>>> To: <de...@beam.apache.org>
>>> >> >>>> Cc: user
>>> >> >>>>
>>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>>> read?
>>> >> >>>>>
>>> >> >>>>> Thanks!
>>> >> >>>>> Allie
>>> >> >>>>>
>>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>> >> >>>>> To: dev
>>> >> >>>>> Cc: user
>>> >> >>>>>
>>> >> >>>>>> There is no such flag to turn of fusion.
>>> >> >>>>>>
>>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>>> take time when it is limited to a small number of workers.
>>> >> >>>>>>
>>> >> >>>>>> If you can split up your input into a lot of smaller files
>>> that are compressed then you shouldn't need to use the reshuffle but still
>>> could if you found it helped.
>>> >> >>>>>>
>>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>
>>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>>> format but I will see whether the splitting gzip files will work. Is there
>>> a simple flag in Dataflow that could turn off the fusion?
>>> >> >>>>>>>
>>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>>> Reshuffle itself is not parallel either.
>>> >> >>>>>>>
>>> >> >>>>>>> Thanks all,
>>> >> >>>>>>>
>>> >> >>>>>>> Allie
>>> >> >>>>>>>
>>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> >> >>>>>>> To: dev
>>> >> >>>>>>> Cc: user
>>> >> >>>>>>>
>>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>>> likely that simply reading and decompressing all that data was very slow
>>> when there was no parallelism.
>>> >> >>>>>>>>
>>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>> >> >>>>>>>> To: <de...@beam.apache.org>
>>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>>> >> >>>>>>>>
>>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>>> parallel. But Reshuffle transform itself takes hours or even days to run,
>>> according to one test (24 gzip files, 17 million lines in total) I did.
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>>> uncompressed files would be too costly to store (It could be in hundreds of
>>> GB).
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Thanks,
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Allie
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>> +user@beam.apache.org
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>>> waits till all the data has been read before the next transforms can run.
>>> After the reshuffle, the data should have been processed in parallel across
>>> the workers. Did you see this?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use
>>> an uncompressed file or many compressed files?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Hi,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using
>>> Dataflow. Since the compressed file is not splittable, one worker is
>>> allocated to read the file. The same worker will do all the other
>>> transforms since Dataflow fused all transforms together.  There are a large
>>> amount of data in the file, and I expect to see more workers spinning up
>>> after reading transforms. I tried to use Reshuffle Transform to prevent the
>>> fusion, but it is not scalable since it won’t proceed until all data
>>> arrived at this point.
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>>> all the other transforms after reading?
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Thanks,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Allie
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>>
>>

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
For hadoop, one doesn't have the option of a reshard without introducing a
whole new Map+Shufle+Reduce. In terms of cost, let's call these "reread"
vs. "reshuffle" for an input of size S.

Reread: Reads the first 1/N of the data set N times, the second 1/N N-1
times, ... for a total of (N+1)/2 * S bytes read.
Reshuffle: Reads the entire data set twice, and writes it once for a total
of 2*S bytes read and S bytes written. (One also pays for the intermediate
storage that is saved above.)

On Wed, May 15, 2019 at 7:35 PM Chamikara Jayalath <ch...@google.com>
wrote:

> Probably include "--experiment=use_fastavro" (along with Robert's
> suggestion) which will make Dataflow use fastavro library for intermediate
> files (materialized in a fusion break).
>
> Also, have you considered storing your input directly as Avro files with
> block compression ? Avro files should perform much better since we can
> split at block boundaries.
>
> Regarding splittablegzip, usually we have avoided supporting splitting by
> reading and discarding data till split points since this does not save read
> time (and later split points have to waste more read data) but it's
> interesting indeed if users find this useful in real world scenarios.
>
> Thanks,
> Cham
>
>
> On Wed, May 15, 2019 at 8:49 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Niels, it is interesting to see that in the Hadoop community that reading
>> a gzip file through from the beginning to the split point is worth it for
>> some real user scenarios.
>>
>> [image: image.png]
>>
>>
>> *From: *Robert Bradshaw <ro...@google.com>
>> *Date: *Wed, May 15, 2019 at 5:52 AM
>> *To: * <us...@beam.apache.org>
>> *Cc: *dev
>>
>> Interesting thread. Thanks for digging that up.
>>>
>>> I would try the shuffle_mode=service experiment (forgot that wasn't
>>> yet the default). If that doesn't do the trick, though avro as a
>>> materialization format does not provide perfect parallelism, it should
>>> be significantly better than what you have now (large gzip files) and
>>> may be good enough.
>>>
>>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>>> wrote:
>>> >
>>> > @Robert
>>> >
>>> > Does your suggestion imply, that the points made by Eugene on
>>> BEAM-2803 do not apply (anymore) and the combined reshuffle could just be
>>> omitted?
>>> >
>>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Unfortunately the "write" portion of the reshuffle cannot be
>>> >> parallelized more than the source that it's reading from. In my
>>> >> experience, generally the read is the bottleneck in this case, but
>>> >> it's possible (e.g. if the input compresses extremely well) that it is
>>> >> the write that is slow (which you seem to indicate based on your
>>> >> observation of the UI, right?).
>>> >>
>>> >> It could be that materializing to temporary files is cheaper than
>>> >> materializing randomly to shuffle (especially on pre-portable Python).
>>> >> In that case you could force a fusion break with a side input instead.
>>> >> E.g.
>>> >>
>>> >> class FusionBreak(beam.PTransform):
>>> >>     def expand(self, pcoll):
>>> >>         # Create an empty PCollection that depends on pcoll.
>>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>>> >>         # Use this empty PCollection as a side input, which will force
>>> >> a fusion break.
>>> >>         return pcoll | beam.Map(lambda x, unused: x,
>>> >> beam.pvalue.AsIterable(empty))
>>> >>
>>> >> which could be used in place of Reshard like
>>> >>
>>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>> >>
>>> >> You'll probably want to be sure to pass the use_fastavro experiment
>>> as well.
>>> >>
>>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>>> >> >
>>> >> > Hi
>>> >> >
>>> >> > This project is a completely different solution towards this
>>> problem, but in the hadoop mapreduce context.
>>> >> >
>>> >> > https://github.com/nielsbasjes/splittablegzip
>>> >> >
>>> >> >
>>> >> > I have used this a lot in the past.
>>> >> > Perhaps porting this project to beam is an option?
>>> >> >
>>> >> > Niels Basjes
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>> >> >>
>>> >> >> Sorry I couldn't be more helpful.
>>> >> >>
>>> >> >> From: Allie Chen <yi...@google.com>
>>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>>> >> >> To: <de...@beam.apache.org>
>>> >> >> Cc: user
>>> >> >>
>>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>>> option for us.
>>> >> >>>
>>> >> >>>
>>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>>> data. Here are two ways I have tried:
>>> >> >>>
>>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>>> (since it is bounded source), then apply windowing before Reshuffle, but it
>>> still waits all data.
>>> >> >>>
>>> >> >>>
>>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>>> error: Workflow failed. Causes: Expected custom source to have non-zero
>>> number of splits. Also, I found in
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>>> :
>>> >> >>>
>>> >> >>> DataflowRunner does not currently support the following Cloud
>>> Dataflow specific features with Python streaming execution.
>>> >> >>>
>>> >> >>> Streaming autoscaling
>>> >> >>>
>>> >> >>> I doubt whether this approach can solve my issue.
>>> >> >>>
>>> >> >>>
>>> >> >>> Thanks so much!
>>> >> >>>
>>> >> >>> Allie
>>> >> >>>
>>> >> >>>
>>> >> >>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>>> >> >>> To: dev
>>> >> >>> Cc: user
>>> >> >>>
>>> >> >>>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> >> >>>> If not, you could structure your pipeline
>>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> >> >>>> and then run it as a batch pipeline.
>>> >> >>>>
>>> >> >>>> You can set --streaming=true on the pipeline and then it will
>>> run in a streaming mode but streaming prioritizes low latency and
>>> correctness on Google Cloud Dataflow so it will cost more to run your
>>> pipeline then in batch mode. It may make more sense to store the data
>>> uncompressed as it may be less expensive then paying the additional compute
>>> cost for streaming.
>>> >> >>>>
>>> >> >>>> From: Allie Chen <yi...@google.com>
>>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>>> >> >>>> To: <de...@beam.apache.org>
>>> >> >>>> Cc: user
>>> >> >>>>
>>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>>> read?
>>> >> >>>>>
>>> >> >>>>> Thanks!
>>> >> >>>>> Allie
>>> >> >>>>>
>>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>> >> >>>>> To: dev
>>> >> >>>>> Cc: user
>>> >> >>>>>
>>> >> >>>>>> There is no such flag to turn of fusion.
>>> >> >>>>>>
>>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>>> take time when it is limited to a small number of workers.
>>> >> >>>>>>
>>> >> >>>>>> If you can split up your input into a lot of smaller files
>>> that are compressed then you shouldn't need to use the reshuffle but still
>>> could if you found it helped.
>>> >> >>>>>>
>>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>
>>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>>> format but I will see whether the splitting gzip files will work. Is there
>>> a simple flag in Dataflow that could turn off the fusion?
>>> >> >>>>>>>
>>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>>> Reshuffle itself is not parallel either.
>>> >> >>>>>>>
>>> >> >>>>>>> Thanks all,
>>> >> >>>>>>>
>>> >> >>>>>>> Allie
>>> >> >>>>>>>
>>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> >> >>>>>>> To: dev
>>> >> >>>>>>> Cc: user
>>> >> >>>>>>>
>>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>>> likely that simply reading and decompressing all that data was very slow
>>> when there was no parallelism.
>>> >> >>>>>>>>
>>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>> >> >>>>>>>> To: <de...@beam.apache.org>
>>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>>> >> >>>>>>>>
>>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>>> parallel. But Reshuffle transform itself takes hours or even days to run,
>>> according to one test (24 gzip files, 17 million lines in total) I did.
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>>> uncompressed files would be too costly to store (It could be in hundreds of
>>> GB).
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Thanks,
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Allie
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>> +user@beam.apache.org
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>>> waits till all the data has been read before the next transforms can run.
>>> After the reshuffle, the data should have been processed in parallel across
>>> the workers. Did you see this?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use
>>> an uncompressed file or many compressed files?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Hi,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using
>>> Dataflow. Since the compressed file is not splittable, one worker is
>>> allocated to read the file. The same worker will do all the other
>>> transforms since Dataflow fused all transforms together.  There are a large
>>> amount of data in the file, and I expect to see more workers spinning up
>>> after reading transforms. I tried to use Reshuffle Transform to prevent the
>>> fusion, but it is not scalable since it won’t proceed until all data
>>> arrived at this point.
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>>> all the other transforms after reading?
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Thanks,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Allie
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>>
>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Thanks Robert. Yes, reading is the bottleneck, and we cannot do much better
for gzip files, that's why we would like to at least palatalize other
transforms with reading.

I tried with the side input to break the fusion you suggested earlier, and
it does a much better job than using Reshuffle! One test running time if
anyone is interested,

without any fusion break: 6 hours
with Reshuffle: never ends. cancelled after running 6 hours, about half
elements processed at Reshuffle step.
with side input (not using --experiment=use_fastavro yet, I will try it
later): 2 hours

Thanks all for your help!
Allie


*From: *Robert Bradshaw <ro...@google.com>
*Date: *Wed, May 15, 2019 at 3:34 PM
*To: *dev
*Cc: *user

On Wed, May 15, 2019 at 8:43 PM Allie Chen <yi...@google.com> wrote:
>
>> Thanks all for your reply. I will try each of them and see how it goes.
>>
>> The experiment I am working now is similar to
>> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
>> which tries to get early results from GroupByKey with windowing. I have
>> some code like:
>>
>> Reading | beam.WindowInto(beam.window.GlobalWindows(),
>>
>>                                                   trigger=trigger.Repeatedly(trigger.AfterCount(1)),
>>  accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
>>
>> | MapWithAKey
>>
>> | GroupByKey
>>
>> | RemoveKey
>>
>> | OtherTransforms
>>
>>
>> I don't see the window and trigger working, GroupByKey still waits for
>> all elements. I also tried adding a timestamp for each element and using a
>> fixed size window. Seems no impact.
>>
>>
>> Anyone knows how to get the early results from GroupByKey for a bounded
>> source?
>>
>
> Note that this is essentially how Reshuffle() is implemented. However,
> batch never gives early results from a GroupByKey; each stage is executed
> sequentially.
>
> Is the goal here to be able to parallelize the Read with other operations?
> If the Read (and limited-parallelism write) is still the bottleneck, that
> might not help much.
>
>

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, May 15, 2019 at 8:43 PM Allie Chen <yi...@google.com> wrote:

> Thanks all for your reply. I will try each of them and see how it goes.
>
> The experiment I am working now is similar to
> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
> which tries to get early results from GroupByKey with windowing. I have
> some code like:
>
> Reading | beam.WindowInto(beam.window.GlobalWindows(),
>
>                                                   trigger=trigger.Repeatedly(trigger.AfterCount(1)),
>  accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
>
> | MapWithAKey
>
> | GroupByKey
>
> | RemoveKey
>
> | OtherTransforms
>
>
> I don't see the window and trigger working, GroupByKey still waits for all
> elements. I also tried adding a timestamp for each element and using a
> fixed size window. Seems no impact.
>
>
> Anyone knows how to get the early results from GroupByKey for a bounded
> source?
>

Note that this is essentially how Reshuffle() is implemented. However,
batch never gives early results from a GroupByKey; each stage is executed
sequentially.

Is the goal here to be able to parallelize the Read with other operations?
If the Read (and limited-parallelism write) is still the bottleneck, that
might not help much.

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
On Wed, May 15, 2019 at 8:43 PM Allie Chen <yi...@google.com> wrote:

> Thanks all for your reply. I will try each of them and see how it goes.
>
> The experiment I am working now is similar to
> https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
> which tries to get early results from GroupByKey with windowing. I have
> some code like:
>
> Reading | beam.WindowInto(beam.window.GlobalWindows(),
>
>                                                   trigger=trigger.Repeatedly(trigger.AfterCount(1)),
>  accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)
>
> | MapWithAKey
>
> | GroupByKey
>
> | RemoveKey
>
> | OtherTransforms
>
>
> I don't see the window and trigger working, GroupByKey still waits for all
> elements. I also tried adding a timestamp for each element and using a
> fixed size window. Seems no impact.
>
>
> Anyone knows how to get the early results from GroupByKey for a bounded
> source?
>

Note that this is essentially how Reshuffle() is implemented. However,
batch never gives early results from a GroupByKey; each stage is executed
sequentially.

Is the goal here to be able to parallelize the Read with other operations?
If the Read (and limited-parallelism write) is still the bottleneck, that
might not help much.

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Thanks all for your reply. I will try each of them and see how it goes.

The experiment I am working now is similar to
https://stackoverflow.com/questions/48886943/early-results-from-groupbykey-transform,
which tries to get early results from GroupByKey with windowing. I have
some code like:

Reading | beam.WindowInto(beam.window.GlobalWindows(),


trigger=trigger.Repeatedly(trigger.AfterCount(1)),
 accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING)

| MapWithAKey

| GroupByKey

| RemoveKey

| OtherTransforms


I don't see the window and trigger working, GroupByKey still waits for all
elements. I also tried adding a timestamp for each element and using a
fixed size window. Seems no impact.


Anyone knows how to get the early results from GroupByKey for a bounded
source?


Thanks again.

Allie



*From: *Chamikara Jayalath <ch...@google.com>
*Date: *Wed, May 15, 2019 at 1:35 PM
*To: * <us...@beam.apache.org>
*Cc: *dev

Probably include "--experiment=use_fastavro" (along with Robert's
> suggestion) which will make Dataflow use fastavro library for intermediate
> files (materialized in a fusion break).
>
> Also, have you considered storing your input directly as Avro files with
> block compression ? Avro files should perform much better since we can
> split at block boundaries.
>
> Regarding splittablegzip, usually we have avoided supporting splitting by
> reading and discarding data till split points since this does not save read
> time (and later split points have to waste more read data) but it's
> interesting indeed if users find this useful in real world scenarios.
>
> Thanks,
> Cham
>
>
> On Wed, May 15, 2019 at 8:49 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Niels, it is interesting to see that in the Hadoop community that reading
>> a gzip file through from the beginning to the split point is worth it for
>> some real user scenarios.
>>
>> [image: image.png]
>>
>>
>> *From: *Robert Bradshaw <ro...@google.com>
>> *Date: *Wed, May 15, 2019 at 5:52 AM
>> *To: * <us...@beam.apache.org>
>> *Cc: *dev
>>
>> Interesting thread. Thanks for digging that up.
>>>
>>> I would try the shuffle_mode=service experiment (forgot that wasn't
>>> yet the default). If that doesn't do the trick, though avro as a
>>> materialization format does not provide perfect parallelism, it should
>>> be significantly better than what you have now (large gzip files) and
>>> may be good enough.
>>>
>>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>>> wrote:
>>> >
>>> > @Robert
>>> >
>>> > Does your suggestion imply, that the points made by Eugene on
>>> BEAM-2803 do not apply (anymore) and the combined reshuffle could just be
>>> omitted?
>>> >
>>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Unfortunately the "write" portion of the reshuffle cannot be
>>> >> parallelized more than the source that it's reading from. In my
>>> >> experience, generally the read is the bottleneck in this case, but
>>> >> it's possible (e.g. if the input compresses extremely well) that it is
>>> >> the write that is slow (which you seem to indicate based on your
>>> >> observation of the UI, right?).
>>> >>
>>> >> It could be that materializing to temporary files is cheaper than
>>> >> materializing randomly to shuffle (especially on pre-portable Python).
>>> >> In that case you could force a fusion break with a side input instead.
>>> >> E.g.
>>> >>
>>> >> class FusionBreak(beam.PTransform):
>>> >>     def expand(self, pcoll):
>>> >>         # Create an empty PCollection that depends on pcoll.
>>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>>> >>         # Use this empty PCollection as a side input, which will force
>>> >> a fusion break.
>>> >>         return pcoll | beam.Map(lambda x, unused: x,
>>> >> beam.pvalue.AsIterable(empty))
>>> >>
>>> >> which could be used in place of Reshard like
>>> >>
>>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>> >>
>>> >> You'll probably want to be sure to pass the use_fastavro experiment
>>> as well.
>>> >>
>>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>>> >> >
>>> >> > Hi
>>> >> >
>>> >> > This project is a completely different solution towards this
>>> problem, but in the hadoop mapreduce context.
>>> >> >
>>> >> > https://github.com/nielsbasjes/splittablegzip
>>> >> >
>>> >> >
>>> >> > I have used this a lot in the past.
>>> >> > Perhaps porting this project to beam is an option?
>>> >> >
>>> >> > Niels Basjes
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>> >> >>
>>> >> >> Sorry I couldn't be more helpful.
>>> >> >>
>>> >> >> From: Allie Chen <yi...@google.com>
>>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>>> >> >> To: <de...@beam.apache.org>
>>> >> >> Cc: user
>>> >> >>
>>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>>> option for us.
>>> >> >>>
>>> >> >>>
>>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>>> data. Here are two ways I have tried:
>>> >> >>>
>>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>>> (since it is bounded source), then apply windowing before Reshuffle, but it
>>> still waits all data.
>>> >> >>>
>>> >> >>>
>>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>>> error: Workflow failed. Causes: Expected custom source to have non-zero
>>> number of splits. Also, I found in
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>>> :
>>> >> >>>
>>> >> >>> DataflowRunner does not currently support the following Cloud
>>> Dataflow specific features with Python streaming execution.
>>> >> >>>
>>> >> >>> Streaming autoscaling
>>> >> >>>
>>> >> >>> I doubt whether this approach can solve my issue.
>>> >> >>>
>>> >> >>>
>>> >> >>> Thanks so much!
>>> >> >>>
>>> >> >>> Allie
>>> >> >>>
>>> >> >>>
>>> >> >>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>>> >> >>> To: dev
>>> >> >>> Cc: user
>>> >> >>>
>>> >> >>>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> >> >>>> If not, you could structure your pipeline
>>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> >> >>>> and then run it as a batch pipeline.
>>> >> >>>>
>>> >> >>>> You can set --streaming=true on the pipeline and then it will
>>> run in a streaming mode but streaming prioritizes low latency and
>>> correctness on Google Cloud Dataflow so it will cost more to run your
>>> pipeline then in batch mode. It may make more sense to store the data
>>> uncompressed as it may be less expensive then paying the additional compute
>>> cost for streaming.
>>> >> >>>>
>>> >> >>>> From: Allie Chen <yi...@google.com>
>>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>>> >> >>>> To: <de...@beam.apache.org>
>>> >> >>>> Cc: user
>>> >> >>>>
>>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>>> read?
>>> >> >>>>>
>>> >> >>>>> Thanks!
>>> >> >>>>> Allie
>>> >> >>>>>
>>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>> >> >>>>> To: dev
>>> >> >>>>> Cc: user
>>> >> >>>>>
>>> >> >>>>>> There is no such flag to turn of fusion.
>>> >> >>>>>>
>>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>>> take time when it is limited to a small number of workers.
>>> >> >>>>>>
>>> >> >>>>>> If you can split up your input into a lot of smaller files
>>> that are compressed then you shouldn't need to use the reshuffle but still
>>> could if you found it helped.
>>> >> >>>>>>
>>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>
>>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>>> format but I will see whether the splitting gzip files will work. Is there
>>> a simple flag in Dataflow that could turn off the fusion?
>>> >> >>>>>>>
>>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>>> Reshuffle itself is not parallel either.
>>> >> >>>>>>>
>>> >> >>>>>>> Thanks all,
>>> >> >>>>>>>
>>> >> >>>>>>> Allie
>>> >> >>>>>>>
>>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> >> >>>>>>> To: dev
>>> >> >>>>>>> Cc: user
>>> >> >>>>>>>
>>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>>> likely that simply reading and decompressing all that data was very slow
>>> when there was no parallelism.
>>> >> >>>>>>>>
>>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>> >> >>>>>>>> To: <de...@beam.apache.org>
>>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>>> >> >>>>>>>>
>>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>>> parallel. But Reshuffle transform itself takes hours or even days to run,
>>> according to one test (24 gzip files, 17 million lines in total) I did.
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>>> uncompressed files would be too costly to store (It could be in hundreds of
>>> GB).
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Thanks,
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Allie
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>> +user@beam.apache.org
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>>> waits till all the data has been read before the next transforms can run.
>>> After the reshuffle, the data should have been processed in parallel across
>>> the workers. Did you see this?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use
>>> an uncompressed file or many compressed files?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Hi,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using
>>> Dataflow. Since the compressed file is not splittable, one worker is
>>> allocated to read the file. The same worker will do all the other
>>> transforms since Dataflow fused all transforms together.  There are a large
>>> amount of data in the file, and I expect to see more workers spinning up
>>> after reading transforms. I tried to use Reshuffle Transform to prevent the
>>> fusion, but it is not scalable since it won’t proceed until all data
>>> arrived at this point.
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>>> all the other transforms after reading?
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Thanks,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Allie
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>>
>>

Re: Problem with gzip

Posted by Chamikara Jayalath <ch...@google.com>.
Probably include "--experiment=use_fastavro" (along with Robert's
suggestion) which will make Dataflow use fastavro library for intermediate
files (materialized in a fusion break).

Also, have you considered storing your input directly as Avro files with
block compression ? Avro files should perform much better since we can
split at block boundaries.

Regarding splittablegzip, usually we have avoided supporting splitting by
reading and discarding data till split points since this does not save read
time (and later split points have to waste more read data) but it's
interesting indeed if users find this useful in real world scenarios.

Thanks,
Cham


On Wed, May 15, 2019 at 8:49 AM Lukasz Cwik <lc...@google.com> wrote:

> Niels, it is interesting to see that in the Hadoop community that reading
> a gzip file through from the beginning to the split point is worth it for
> some real user scenarios.
>
> [image: image.png]
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Date: *Wed, May 15, 2019 at 5:52 AM
> *To: * <us...@beam.apache.org>
> *Cc: *dev
>
> Interesting thread. Thanks for digging that up.
>>
>> I would try the shuffle_mode=service experiment (forgot that wasn't
>> yet the default). If that doesn't do the trick, though avro as a
>> materialization format does not provide perfect parallelism, it should
>> be significantly better than what you have now (large gzip files) and
>> may be good enough.
>>
>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>> wrote:
>> >
>> > @Robert
>> >
>> > Does your suggestion imply, that the points made by Eugene on BEAM-2803
>> do not apply (anymore) and the combined reshuffle could just be omitted?
>> >
>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Unfortunately the "write" portion of the reshuffle cannot be
>> >> parallelized more than the source that it's reading from. In my
>> >> experience, generally the read is the bottleneck in this case, but
>> >> it's possible (e.g. if the input compresses extremely well) that it is
>> >> the write that is slow (which you seem to indicate based on your
>> >> observation of the UI, right?).
>> >>
>> >> It could be that materializing to temporary files is cheaper than
>> >> materializing randomly to shuffle (especially on pre-portable Python).
>> >> In that case you could force a fusion break with a side input instead.
>> >> E.g.
>> >>
>> >> class FusionBreak(beam.PTransform):
>> >>     def expand(self, pcoll):
>> >>         # Create an empty PCollection that depends on pcoll.
>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>> >>         # Use this empty PCollection as a side input, which will force
>> >> a fusion break.
>> >>         return pcoll | beam.Map(lambda x, unused: x,
>> >> beam.pvalue.AsIterable(empty))
>> >>
>> >> which could be used in place of Reshard like
>> >>
>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>> >>
>> >> You'll probably want to be sure to pass the use_fastavro experiment as
>> well.
>> >>
>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>> >> >
>> >> > Hi
>> >> >
>> >> > This project is a completely different solution towards this
>> problem, but in the hadoop mapreduce context.
>> >> >
>> >> > https://github.com/nielsbasjes/splittablegzip
>> >> >
>> >> >
>> >> > I have used this a lot in the past.
>> >> > Perhaps porting this project to beam is an option?
>> >> >
>> >> > Niels Basjes
>> >> >
>> >> >
>> >> >
>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>> >> >>
>> >> >> Sorry I couldn't be more helpful.
>> >> >>
>> >> >> From: Allie Chen <yi...@google.com>
>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> >> To: <de...@beam.apache.org>
>> >> >> Cc: user
>> >> >>
>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>> option for us.
>> >> >>>
>> >> >>>
>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>> data. Here are two ways I have tried:
>> >> >>>
>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>> (since it is bounded source), then apply windowing before Reshuffle, but it
>> still waits all data.
>> >> >>>
>> >> >>>
>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>> error: Workflow failed. Causes: Expected custom source to have non-zero
>> number of splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>> >> >>>
>> >> >>> DataflowRunner does not currently support the following Cloud
>> Dataflow specific features with Python streaming execution.
>> >> >>>
>> >> >>> Streaming autoscaling
>> >> >>>
>> >> >>> I doubt whether this approach can solve my issue.
>> >> >>>
>> >> >>>
>> >> >>> Thanks so much!
>> >> >>>
>> >> >>> Allie
>> >> >>>
>> >> >>>
>> >> >>> From: Lukasz Cwik <lc...@google.com>
>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >> >>> To: dev
>> >> >>> Cc: user
>> >> >>>
>> >> >>>> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> >> >>>> If not, you could structure your pipeline
>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> >> >>>> and then run it as a batch pipeline.
>> >> >>>>
>> >> >>>> You can set --streaming=true on the pipeline and then it will run
>> in a streaming mode but streaming prioritizes low latency and correctness
>> on Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>> >> >>>>
>> >> >>>> From: Allie Chen <yi...@google.com>
>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>> >> >>>> To: <de...@beam.apache.org>
>> >> >>>> Cc: user
>> >> >>>>
>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>> read?
>> >> >>>>>
>> >> >>>>> Thanks!
>> >> >>>>> Allie
>> >> >>>>>
>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>> >> >>>>> To: dev
>> >> >>>>> Cc: user
>> >> >>>>>
>> >> >>>>>> There is no such flag to turn of fusion.
>> >> >>>>>>
>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>> take time when it is limited to a small number of workers.
>> >> >>>>>>
>> >> >>>>>> If you can split up your input into a lot of smaller files that
>> are compressed then you shouldn't need to use the reshuffle but still could
>> if you found it helped.
>> >> >>>>>>
>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>
>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>> format but I will see whether the splitting gzip files will work. Is there
>> a simple flag in Dataflow that could turn off the fusion?
>> >> >>>>>>>
>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>> Reshuffle itself is not parallel either.
>> >> >>>>>>>
>> >> >>>>>>> Thanks all,
>> >> >>>>>>>
>> >> >>>>>>> Allie
>> >> >>>>>>>
>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>> >> >>>>>>> To: dev
>> >> >>>>>>> Cc: user
>> >> >>>>>>>
>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>> likely that simply reading and decompressing all that data was very slow
>> when there was no parallelism.
>> >> >>>>>>>>
>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>> >> >>>>>>>> To: <de...@beam.apache.org>
>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>> >> >>>>>>>>
>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>> parallel. But Reshuffle transform itself takes hours or even days to run,
>> according to one test (24 gzip files, 17 million lines in total) I did.
>> >> >>>>>>>>>
>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>> uncompressed files would be too costly to store (It could be in hundreds of
>> GB).
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>>
>> >> >>>>>>>>> Allie
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>> >> >>>>>>>>>
>> >> >>>>>>>>>> +user@beam.apache.org
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>> waits till all the data has been read before the next transforms can run.
>> After the reshuffle, the data should have been processed in parallel across
>> the workers. Did you see this?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an
>> uncompressed file or many compressed files?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Hi,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
>> Since the compressed file is not splittable, one worker is allocated to
>> read the file. The same worker will do all the other transforms since
>> Dataflow fused all transforms together.  There are a large amount of data
>> in the file, and I expect to see more workers spinning up after reading
>> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
>> it is not scalable since it won’t proceed until all data arrived at this
>> point.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>> all the other transforms after reading?
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Thanks,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Allie
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>>
>

Re: Problem with gzip

Posted by Chamikara Jayalath <ch...@google.com>.
Probably include "--experiment=use_fastavro" (along with Robert's
suggestion) which will make Dataflow use fastavro library for intermediate
files (materialized in a fusion break).

Also, have you considered storing your input directly as Avro files with
block compression ? Avro files should perform much better since we can
split at block boundaries.

Regarding splittablegzip, usually we have avoided supporting splitting by
reading and discarding data till split points since this does not save read
time (and later split points have to waste more read data) but it's
interesting indeed if users find this useful in real world scenarios.

Thanks,
Cham


On Wed, May 15, 2019 at 8:49 AM Lukasz Cwik <lc...@google.com> wrote:

> Niels, it is interesting to see that in the Hadoop community that reading
> a gzip file through from the beginning to the split point is worth it for
> some real user scenarios.
>
> [image: image.png]
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Date: *Wed, May 15, 2019 at 5:52 AM
> *To: * <us...@beam.apache.org>
> *Cc: *dev
>
> Interesting thread. Thanks for digging that up.
>>
>> I would try the shuffle_mode=service experiment (forgot that wasn't
>> yet the default). If that doesn't do the trick, though avro as a
>> materialization format does not provide perfect parallelism, it should
>> be significantly better than what you have now (large gzip files) and
>> may be good enough.
>>
>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>> wrote:
>> >
>> > @Robert
>> >
>> > Does your suggestion imply, that the points made by Eugene on BEAM-2803
>> do not apply (anymore) and the combined reshuffle could just be omitted?
>> >
>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Unfortunately the "write" portion of the reshuffle cannot be
>> >> parallelized more than the source that it's reading from. In my
>> >> experience, generally the read is the bottleneck in this case, but
>> >> it's possible (e.g. if the input compresses extremely well) that it is
>> >> the write that is slow (which you seem to indicate based on your
>> >> observation of the UI, right?).
>> >>
>> >> It could be that materializing to temporary files is cheaper than
>> >> materializing randomly to shuffle (especially on pre-portable Python).
>> >> In that case you could force a fusion break with a side input instead.
>> >> E.g.
>> >>
>> >> class FusionBreak(beam.PTransform):
>> >>     def expand(self, pcoll):
>> >>         # Create an empty PCollection that depends on pcoll.
>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>> >>         # Use this empty PCollection as a side input, which will force
>> >> a fusion break.
>> >>         return pcoll | beam.Map(lambda x, unused: x,
>> >> beam.pvalue.AsIterable(empty))
>> >>
>> >> which could be used in place of Reshard like
>> >>
>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>> >>
>> >> You'll probably want to be sure to pass the use_fastavro experiment as
>> well.
>> >>
>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>> >> >
>> >> > Hi
>> >> >
>> >> > This project is a completely different solution towards this
>> problem, but in the hadoop mapreduce context.
>> >> >
>> >> > https://github.com/nielsbasjes/splittablegzip
>> >> >
>> >> >
>> >> > I have used this a lot in the past.
>> >> > Perhaps porting this project to beam is an option?
>> >> >
>> >> > Niels Basjes
>> >> >
>> >> >
>> >> >
>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>> >> >>
>> >> >> Sorry I couldn't be more helpful.
>> >> >>
>> >> >> From: Allie Chen <yi...@google.com>
>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> >> To: <de...@beam.apache.org>
>> >> >> Cc: user
>> >> >>
>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>> option for us.
>> >> >>>
>> >> >>>
>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>> data. Here are two ways I have tried:
>> >> >>>
>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>> (since it is bounded source), then apply windowing before Reshuffle, but it
>> still waits all data.
>> >> >>>
>> >> >>>
>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>> error: Workflow failed. Causes: Expected custom source to have non-zero
>> number of splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>> >> >>>
>> >> >>> DataflowRunner does not currently support the following Cloud
>> Dataflow specific features with Python streaming execution.
>> >> >>>
>> >> >>> Streaming autoscaling
>> >> >>>
>> >> >>> I doubt whether this approach can solve my issue.
>> >> >>>
>> >> >>>
>> >> >>> Thanks so much!
>> >> >>>
>> >> >>> Allie
>> >> >>>
>> >> >>>
>> >> >>> From: Lukasz Cwik <lc...@google.com>
>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >> >>> To: dev
>> >> >>> Cc: user
>> >> >>>
>> >> >>>> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> >> >>>> If not, you could structure your pipeline
>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> >> >>>> and then run it as a batch pipeline.
>> >> >>>>
>> >> >>>> You can set --streaming=true on the pipeline and then it will run
>> in a streaming mode but streaming prioritizes low latency and correctness
>> on Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>> >> >>>>
>> >> >>>> From: Allie Chen <yi...@google.com>
>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>> >> >>>> To: <de...@beam.apache.org>
>> >> >>>> Cc: user
>> >> >>>>
>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>> read?
>> >> >>>>>
>> >> >>>>> Thanks!
>> >> >>>>> Allie
>> >> >>>>>
>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>> >> >>>>> To: dev
>> >> >>>>> Cc: user
>> >> >>>>>
>> >> >>>>>> There is no such flag to turn of fusion.
>> >> >>>>>>
>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>> take time when it is limited to a small number of workers.
>> >> >>>>>>
>> >> >>>>>> If you can split up your input into a lot of smaller files that
>> are compressed then you shouldn't need to use the reshuffle but still could
>> if you found it helped.
>> >> >>>>>>
>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>
>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>> format but I will see whether the splitting gzip files will work. Is there
>> a simple flag in Dataflow that could turn off the fusion?
>> >> >>>>>>>
>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>> Reshuffle itself is not parallel either.
>> >> >>>>>>>
>> >> >>>>>>> Thanks all,
>> >> >>>>>>>
>> >> >>>>>>> Allie
>> >> >>>>>>>
>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>> >> >>>>>>> To: dev
>> >> >>>>>>> Cc: user
>> >> >>>>>>>
>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>> likely that simply reading and decompressing all that data was very slow
>> when there was no parallelism.
>> >> >>>>>>>>
>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>> >> >>>>>>>> To: <de...@beam.apache.org>
>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>> >> >>>>>>>>
>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>> parallel. But Reshuffle transform itself takes hours or even days to run,
>> according to one test (24 gzip files, 17 million lines in total) I did.
>> >> >>>>>>>>>
>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>> uncompressed files would be too costly to store (It could be in hundreds of
>> GB).
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>>
>> >> >>>>>>>>> Allie
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>> >> >>>>>>>>>
>> >> >>>>>>>>>> +user@beam.apache.org
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>> waits till all the data has been read before the next transforms can run.
>> After the reshuffle, the data should have been processed in parallel across
>> the workers. Did you see this?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an
>> uncompressed file or many compressed files?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Hi,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
>> Since the compressed file is not splittable, one worker is allocated to
>> read the file. The same worker will do all the other transforms since
>> Dataflow fused all transforms together.  There are a large amount of data
>> in the file, and I expect to see more workers spinning up after reading
>> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
>> it is not scalable since it won’t proceed until all data arrived at this
>> point.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>> all the other transforms after reading?
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Thanks,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Allie
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>>
>

Re: Problem with gzip

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, May 15, 2019 at 3:01 PM Niels Basjes <ni...@basj.es> wrote:

> Is this trick is effective doesn't depend on Hadoop. It all has to do with
> the amount of work you application does in the first processing step and
> how much data is sent to the next steps.
> In some cases this is not effective,  in many it is.
>

This should not be an issue for Dataflow (and probably other Beam runners
as well) since fusion can be broken by running data through shuffle or
producing a side input as mentioned in the previous steps which will allow
steps that follow the read to parallelize. But there's an added cost to
materialize data.


>
> Niels
>
> On Wed, May 15, 2019, 17:49 Lukasz Cwik <lc...@google.com> wrote:
>
>> Niels, it is interesting to see that in the Hadoop community that reading
>> a gzip file through from the beginning to the split point is worth it for
>> some real user scenarios.
>>
>> [image: image.png]
>>
>>
>> *From: *Robert Bradshaw <ro...@google.com>
>> *Date: *Wed, May 15, 2019 at 5:52 AM
>> *To: * <us...@beam.apache.org>
>> *Cc: *dev
>>
>> Interesting thread. Thanks for digging that up.
>>>
>>> I would try the shuffle_mode=service experiment (forgot that wasn't
>>> yet the default). If that doesn't do the trick, though avro as a
>>> materialization format does not provide perfect parallelism, it should
>>> be significantly better than what you have now (large gzip files) and
>>> may be good enough.
>>>
>>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>>> wrote:
>>> >
>>> > @Robert
>>> >
>>> > Does your suggestion imply, that the points made by Eugene on
>>> BEAM-2803 do not apply (anymore) and the combined reshuffle could just be
>>> omitted?
>>> >
>>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Unfortunately the "write" portion of the reshuffle cannot be
>>> >> parallelized more than the source that it's reading from. In my
>>> >> experience, generally the read is the bottleneck in this case, but
>>> >> it's possible (e.g. if the input compresses extremely well) that it is
>>> >> the write that is slow (which you seem to indicate based on your
>>> >> observation of the UI, right?).
>>> >>
>>> >> It could be that materializing to temporary files is cheaper than
>>> >> materializing randomly to shuffle (especially on pre-portable Python).
>>> >> In that case you could force a fusion break with a side input instead.
>>> >> E.g.
>>> >>
>>> >> class FusionBreak(beam.PTransform):
>>> >>     def expand(self, pcoll):
>>> >>         # Create an empty PCollection that depends on pcoll.
>>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>>> >>         # Use this empty PCollection as a side input, which will force
>>> >> a fusion break.
>>> >>         return pcoll | beam.Map(lambda x, unused: x,
>>> >> beam.pvalue.AsIterable(empty))
>>> >>
>>> >> which could be used in place of Reshard like
>>> >>
>>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>> >>
>>> >> You'll probably want to be sure to pass the use_fastavro experiment
>>> as well.
>>> >>
>>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>>> >> >
>>> >> > Hi
>>> >> >
>>> >> > This project is a completely different solution towards this
>>> problem, but in the hadoop mapreduce context.
>>> >> >
>>> >> > https://github.com/nielsbasjes/splittablegzip
>>> >> >
>>> >> >
>>> >> > I have used this a lot in the past.
>>> >> > Perhaps porting this project to beam is an option?
>>> >> >
>>> >> > Niels Basjes
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>> >> >>
>>> >> >> Sorry I couldn't be more helpful.
>>> >> >>
>>> >> >> From: Allie Chen <yi...@google.com>
>>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>>> >> >> To: <de...@beam.apache.org>
>>> >> >> Cc: user
>>> >> >>
>>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>>> option for us.
>>> >> >>>
>>> >> >>>
>>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>>> data. Here are two ways I have tried:
>>> >> >>>
>>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>>> (since it is bounded source), then apply windowing before Reshuffle, but it
>>> still waits all data.
>>> >> >>>
>>> >> >>>
>>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>>> error: Workflow failed. Causes: Expected custom source to have non-zero
>>> number of splits. Also, I found in
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>>> :
>>> >> >>>
>>> >> >>> DataflowRunner does not currently support the following Cloud
>>> Dataflow specific features with Python streaming execution.
>>> >> >>>
>>> >> >>> Streaming autoscaling
>>> >> >>>
>>> >> >>> I doubt whether this approach can solve my issue.
>>> >> >>>
>>> >> >>>
>>> >> >>> Thanks so much!
>>> >> >>>
>>> >> >>> Allie
>>> >> >>>
>>> >> >>>
>>> >> >>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>>> >> >>> To: dev
>>> >> >>> Cc: user
>>> >> >>>
>>> >> >>>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> >> >>>> If not, you could structure your pipeline
>>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> >> >>>> and then run it as a batch pipeline.
>>> >> >>>>
>>> >> >>>> You can set --streaming=true on the pipeline and then it will
>>> run in a streaming mode but streaming prioritizes low latency and
>>> correctness on Google Cloud Dataflow so it will cost more to run your
>>> pipeline then in batch mode. It may make more sense to store the data
>>> uncompressed as it may be less expensive then paying the additional compute
>>> cost for streaming.
>>> >> >>>>
>>> >> >>>> From: Allie Chen <yi...@google.com>
>>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>>> >> >>>> To: <de...@beam.apache.org>
>>> >> >>>> Cc: user
>>> >> >>>>
>>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>>> read?
>>> >> >>>>>
>>> >> >>>>> Thanks!
>>> >> >>>>> Allie
>>> >> >>>>>
>>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>> >> >>>>> To: dev
>>> >> >>>>> Cc: user
>>> >> >>>>>
>>> >> >>>>>> There is no such flag to turn of fusion.
>>> >> >>>>>>
>>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>>> take time when it is limited to a small number of workers.
>>> >> >>>>>>
>>> >> >>>>>> If you can split up your input into a lot of smaller files
>>> that are compressed then you shouldn't need to use the reshuffle but still
>>> could if you found it helped.
>>> >> >>>>>>
>>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>
>>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>>> format but I will see whether the splitting gzip files will work. Is there
>>> a simple flag in Dataflow that could turn off the fusion?
>>> >> >>>>>>>
>>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>>> Reshuffle itself is not parallel either.
>>> >> >>>>>>>
>>> >> >>>>>>> Thanks all,
>>> >> >>>>>>>
>>> >> >>>>>>> Allie
>>> >> >>>>>>>
>>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> >> >>>>>>> To: dev
>>> >> >>>>>>> Cc: user
>>> >> >>>>>>>
>>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>>> likely that simply reading and decompressing all that data was very slow
>>> when there was no parallelism.
>>> >> >>>>>>>>
>>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>> >> >>>>>>>> To: <de...@beam.apache.org>
>>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>>> >> >>>>>>>>
>>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>>> parallel. But Reshuffle transform itself takes hours or even days to run,
>>> according to one test (24 gzip files, 17 million lines in total) I did.
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>>> uncompressed files would be too costly to store (It could be in hundreds of
>>> GB).
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Thanks,
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Allie
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>> +user@beam.apache.org
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>>> waits till all the data has been read before the next transforms can run.
>>> After the reshuffle, the data should have been processed in parallel across
>>> the workers. Did you see this?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use
>>> an uncompressed file or many compressed files?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Hi,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using
>>> Dataflow. Since the compressed file is not splittable, one worker is
>>> allocated to read the file. The same worker will do all the other
>>> transforms since Dataflow fused all transforms together.  There are a large
>>> amount of data in the file, and I expect to see more workers spinning up
>>> after reading transforms. I tried to use Reshuffle Transform to prevent the
>>> fusion, but it is not scalable since it won’t proceed until all data
>>> arrived at this point.
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>>> all the other transforms after reading?
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Thanks,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Allie
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>>
>>

Re: Problem with gzip

Posted by Chamikara Jayalath <ch...@google.com>.
On Wed, May 15, 2019 at 3:01 PM Niels Basjes <ni...@basj.es> wrote:

> Is this trick is effective doesn't depend on Hadoop. It all has to do with
> the amount of work you application does in the first processing step and
> how much data is sent to the next steps.
> In some cases this is not effective,  in many it is.
>

This should not be an issue for Dataflow (and probably other Beam runners
as well) since fusion can be broken by running data through shuffle or
producing a side input as mentioned in the previous steps which will allow
steps that follow the read to parallelize. But there's an added cost to
materialize data.


>
> Niels
>
> On Wed, May 15, 2019, 17:49 Lukasz Cwik <lc...@google.com> wrote:
>
>> Niels, it is interesting to see that in the Hadoop community that reading
>> a gzip file through from the beginning to the split point is worth it for
>> some real user scenarios.
>>
>> [image: image.png]
>>
>>
>> *From: *Robert Bradshaw <ro...@google.com>
>> *Date: *Wed, May 15, 2019 at 5:52 AM
>> *To: * <us...@beam.apache.org>
>> *Cc: *dev
>>
>> Interesting thread. Thanks for digging that up.
>>>
>>> I would try the shuffle_mode=service experiment (forgot that wasn't
>>> yet the default). If that doesn't do the trick, though avro as a
>>> materialization format does not provide perfect parallelism, it should
>>> be significantly better than what you have now (large gzip files) and
>>> may be good enough.
>>>
>>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>>> wrote:
>>> >
>>> > @Robert
>>> >
>>> > Does your suggestion imply, that the points made by Eugene on
>>> BEAM-2803 do not apply (anymore) and the combined reshuffle could just be
>>> omitted?
>>> >
>>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>>> wrote:
>>> >>
>>> >> Unfortunately the "write" portion of the reshuffle cannot be
>>> >> parallelized more than the source that it's reading from. In my
>>> >> experience, generally the read is the bottleneck in this case, but
>>> >> it's possible (e.g. if the input compresses extremely well) that it is
>>> >> the write that is slow (which you seem to indicate based on your
>>> >> observation of the UI, right?).
>>> >>
>>> >> It could be that materializing to temporary files is cheaper than
>>> >> materializing randomly to shuffle (especially on pre-portable Python).
>>> >> In that case you could force a fusion break with a side input instead.
>>> >> E.g.
>>> >>
>>> >> class FusionBreak(beam.PTransform):
>>> >>     def expand(self, pcoll):
>>> >>         # Create an empty PCollection that depends on pcoll.
>>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>>> >>         # Use this empty PCollection as a side input, which will force
>>> >> a fusion break.
>>> >>         return pcoll | beam.Map(lambda x, unused: x,
>>> >> beam.pvalue.AsIterable(empty))
>>> >>
>>> >> which could be used in place of Reshard like
>>> >>
>>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>> >>
>>> >> You'll probably want to be sure to pass the use_fastavro experiment
>>> as well.
>>> >>
>>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>>> >> >
>>> >> > Hi
>>> >> >
>>> >> > This project is a completely different solution towards this
>>> problem, but in the hadoop mapreduce context.
>>> >> >
>>> >> > https://github.com/nielsbasjes/splittablegzip
>>> >> >
>>> >> >
>>> >> > I have used this a lot in the past.
>>> >> > Perhaps porting this project to beam is an option?
>>> >> >
>>> >> > Niels Basjes
>>> >> >
>>> >> >
>>> >> >
>>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>> >> >>
>>> >> >> Sorry I couldn't be more helpful.
>>> >> >>
>>> >> >> From: Allie Chen <yi...@google.com>
>>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>>> >> >> To: <de...@beam.apache.org>
>>> >> >> Cc: user
>>> >> >>
>>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>>> option for us.
>>> >> >>>
>>> >> >>>
>>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>>> data. Here are two ways I have tried:
>>> >> >>>
>>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>>> (since it is bounded source), then apply windowing before Reshuffle, but it
>>> still waits all data.
>>> >> >>>
>>> >> >>>
>>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>>> error: Workflow failed. Causes: Expected custom source to have non-zero
>>> number of splits. Also, I found in
>>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>>> :
>>> >> >>>
>>> >> >>> DataflowRunner does not currently support the following Cloud
>>> Dataflow specific features with Python streaming execution.
>>> >> >>>
>>> >> >>> Streaming autoscaling
>>> >> >>>
>>> >> >>> I doubt whether this approach can solve my issue.
>>> >> >>>
>>> >> >>>
>>> >> >>> Thanks so much!
>>> >> >>>
>>> >> >>> Allie
>>> >> >>>
>>> >> >>>
>>> >> >>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>>> >> >>> To: dev
>>> >> >>> Cc: user
>>> >> >>>
>>> >> >>>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> >> >>>> If not, you could structure your pipeline
>>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> >> >>>> and then run it as a batch pipeline.
>>> >> >>>>
>>> >> >>>> You can set --streaming=true on the pipeline and then it will
>>> run in a streaming mode but streaming prioritizes low latency and
>>> correctness on Google Cloud Dataflow so it will cost more to run your
>>> pipeline then in batch mode. It may make more sense to store the data
>>> uncompressed as it may be less expensive then paying the additional compute
>>> cost for streaming.
>>> >> >>>>
>>> >> >>>> From: Allie Chen <yi...@google.com>
>>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>>> >> >>>> To: <de...@beam.apache.org>
>>> >> >>>> Cc: user
>>> >> >>>>
>>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>>> read?
>>> >> >>>>>
>>> >> >>>>> Thanks!
>>> >> >>>>> Allie
>>> >> >>>>>
>>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>> >> >>>>> To: dev
>>> >> >>>>> Cc: user
>>> >> >>>>>
>>> >> >>>>>> There is no such flag to turn of fusion.
>>> >> >>>>>>
>>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>>> take time when it is limited to a small number of workers.
>>> >> >>>>>>
>>> >> >>>>>> If you can split up your input into a lot of smaller files
>>> that are compressed then you shouldn't need to use the reshuffle but still
>>> could if you found it helped.
>>> >> >>>>>>
>>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>
>>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>>> format but I will see whether the splitting gzip files will work. Is there
>>> a simple flag in Dataflow that could turn off the fusion?
>>> >> >>>>>>>
>>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>>> Reshuffle itself is not parallel either.
>>> >> >>>>>>>
>>> >> >>>>>>> Thanks all,
>>> >> >>>>>>>
>>> >> >>>>>>> Allie
>>> >> >>>>>>>
>>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>> >> >>>>>>> To: dev
>>> >> >>>>>>> Cc: user
>>> >> >>>>>>>
>>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>>> likely that simply reading and decompressing all that data was very slow
>>> when there was no parallelism.
>>> >> >>>>>>>>
>>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>> >> >>>>>>>> To: <de...@beam.apache.org>
>>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>>> >> >>>>>>>>
>>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>>> parallel. But Reshuffle transform itself takes hours or even days to run,
>>> according to one test (24 gzip files, 17 million lines in total) I did.
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>>> uncompressed files would be too costly to store (It could be in hundreds of
>>> GB).
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Thanks,
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> Allie
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>>> >> >>>>>>>>>
>>> >> >>>>>>>>>> +user@beam.apache.org
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>>> waits till all the data has been read before the next transforms can run.
>>> After the reshuffle, the data should have been processed in parallel across
>>> the workers. Did you see this?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use
>>> an uncompressed file or many compressed files?
>>> >> >>>>>>>>>>
>>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>>> yifangchen@google.com> wrote:
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Hi,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using
>>> Dataflow. Since the compressed file is not splittable, one worker is
>>> allocated to read the file. The same worker will do all the other
>>> transforms since Dataflow fused all transforms together.  There are a large
>>> amount of data in the file, and I expect to see more workers spinning up
>>> after reading transforms. I tried to use Reshuffle Transform to prevent the
>>> fusion, but it is not scalable since it won’t proceed until all data
>>> arrived at this point.
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>>> all the other transforms after reading?
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Thanks,
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>> Allie
>>> >> >>>>>>>>>>>
>>> >> >>>>>>>>>>>
>>>
>>

Re: Problem with gzip

Posted by Niels Basjes <ni...@basj.es>.
Is this trick is effective doesn't depend on Hadoop. It all has to do with
the amount of work you application does in the first processing step and
how much data is sent to the next steps.
In some cases this is not effective,  in many it is.

Niels

On Wed, May 15, 2019, 17:49 Lukasz Cwik <lc...@google.com> wrote:

> Niels, it is interesting to see that in the Hadoop community that reading
> a gzip file through from the beginning to the split point is worth it for
> some real user scenarios.
>
> [image: image.png]
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Date: *Wed, May 15, 2019 at 5:52 AM
> *To: * <us...@beam.apache.org>
> *Cc: *dev
>
> Interesting thread. Thanks for digging that up.
>>
>> I would try the shuffle_mode=service experiment (forgot that wasn't
>> yet the default). If that doesn't do the trick, though avro as a
>> materialization format does not provide perfect parallelism, it should
>> be significantly better than what you have now (large gzip files) and
>> may be good enough.
>>
>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>> wrote:
>> >
>> > @Robert
>> >
>> > Does your suggestion imply, that the points made by Eugene on BEAM-2803
>> do not apply (anymore) and the combined reshuffle could just be omitted?
>> >
>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Unfortunately the "write" portion of the reshuffle cannot be
>> >> parallelized more than the source that it's reading from. In my
>> >> experience, generally the read is the bottleneck in this case, but
>> >> it's possible (e.g. if the input compresses extremely well) that it is
>> >> the write that is slow (which you seem to indicate based on your
>> >> observation of the UI, right?).
>> >>
>> >> It could be that materializing to temporary files is cheaper than
>> >> materializing randomly to shuffle (especially on pre-portable Python).
>> >> In that case you could force a fusion break with a side input instead.
>> >> E.g.
>> >>
>> >> class FusionBreak(beam.PTransform):
>> >>     def expand(self, pcoll):
>> >>         # Create an empty PCollection that depends on pcoll.
>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>> >>         # Use this empty PCollection as a side input, which will force
>> >> a fusion break.
>> >>         return pcoll | beam.Map(lambda x, unused: x,
>> >> beam.pvalue.AsIterable(empty))
>> >>
>> >> which could be used in place of Reshard like
>> >>
>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>> >>
>> >> You'll probably want to be sure to pass the use_fastavro experiment as
>> well.
>> >>
>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>> >> >
>> >> > Hi
>> >> >
>> >> > This project is a completely different solution towards this
>> problem, but in the hadoop mapreduce context.
>> >> >
>> >> > https://github.com/nielsbasjes/splittablegzip
>> >> >
>> >> >
>> >> > I have used this a lot in the past.
>> >> > Perhaps porting this project to beam is an option?
>> >> >
>> >> > Niels Basjes
>> >> >
>> >> >
>> >> >
>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>> >> >>
>> >> >> Sorry I couldn't be more helpful.
>> >> >>
>> >> >> From: Allie Chen <yi...@google.com>
>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> >> To: <de...@beam.apache.org>
>> >> >> Cc: user
>> >> >>
>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>> option for us.
>> >> >>>
>> >> >>>
>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>> data. Here are two ways I have tried:
>> >> >>>
>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>> (since it is bounded source), then apply windowing before Reshuffle, but it
>> still waits all data.
>> >> >>>
>> >> >>>
>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>> error: Workflow failed. Causes: Expected custom source to have non-zero
>> number of splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>> >> >>>
>> >> >>> DataflowRunner does not currently support the following Cloud
>> Dataflow specific features with Python streaming execution.
>> >> >>>
>> >> >>> Streaming autoscaling
>> >> >>>
>> >> >>> I doubt whether this approach can solve my issue.
>> >> >>>
>> >> >>>
>> >> >>> Thanks so much!
>> >> >>>
>> >> >>> Allie
>> >> >>>
>> >> >>>
>> >> >>> From: Lukasz Cwik <lc...@google.com>
>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >> >>> To: dev
>> >> >>> Cc: user
>> >> >>>
>> >> >>>> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> >> >>>> If not, you could structure your pipeline
>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> >> >>>> and then run it as a batch pipeline.
>> >> >>>>
>> >> >>>> You can set --streaming=true on the pipeline and then it will run
>> in a streaming mode but streaming prioritizes low latency and correctness
>> on Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>> >> >>>>
>> >> >>>> From: Allie Chen <yi...@google.com>
>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>> >> >>>> To: <de...@beam.apache.org>
>> >> >>>> Cc: user
>> >> >>>>
>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>> read?
>> >> >>>>>
>> >> >>>>> Thanks!
>> >> >>>>> Allie
>> >> >>>>>
>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>> >> >>>>> To: dev
>> >> >>>>> Cc: user
>> >> >>>>>
>> >> >>>>>> There is no such flag to turn of fusion.
>> >> >>>>>>
>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>> take time when it is limited to a small number of workers.
>> >> >>>>>>
>> >> >>>>>> If you can split up your input into a lot of smaller files that
>> are compressed then you shouldn't need to use the reshuffle but still could
>> if you found it helped.
>> >> >>>>>>
>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>
>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>> format but I will see whether the splitting gzip files will work. Is there
>> a simple flag in Dataflow that could turn off the fusion?
>> >> >>>>>>>
>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>> Reshuffle itself is not parallel either.
>> >> >>>>>>>
>> >> >>>>>>> Thanks all,
>> >> >>>>>>>
>> >> >>>>>>> Allie
>> >> >>>>>>>
>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>> >> >>>>>>> To: dev
>> >> >>>>>>> Cc: user
>> >> >>>>>>>
>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>> likely that simply reading and decompressing all that data was very slow
>> when there was no parallelism.
>> >> >>>>>>>>
>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>> >> >>>>>>>> To: <de...@beam.apache.org>
>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>> >> >>>>>>>>
>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>> parallel. But Reshuffle transform itself takes hours or even days to run,
>> according to one test (24 gzip files, 17 million lines in total) I did.
>> >> >>>>>>>>>
>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>> uncompressed files would be too costly to store (It could be in hundreds of
>> GB).
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>>
>> >> >>>>>>>>> Allie
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>> >> >>>>>>>>>
>> >> >>>>>>>>>> +user@beam.apache.org
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>> waits till all the data has been read before the next transforms can run.
>> After the reshuffle, the data should have been processed in parallel across
>> the workers. Did you see this?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an
>> uncompressed file or many compressed files?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Hi,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
>> Since the compressed file is not splittable, one worker is allocated to
>> read the file. The same worker will do all the other transforms since
>> Dataflow fused all transforms together.  There are a large amount of data
>> in the file, and I expect to see more workers spinning up after reading
>> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
>> it is not scalable since it won’t proceed until all data arrived at this
>> point.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>> all the other transforms after reading?
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Thanks,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Allie
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>>
>

Re: Problem with gzip

Posted by Niels Basjes <ni...@basj.es>.
Is this trick is effective doesn't depend on Hadoop. It all has to do with
the amount of work you application does in the first processing step and
how much data is sent to the next steps.
In some cases this is not effective,  in many it is.

Niels

On Wed, May 15, 2019, 17:49 Lukasz Cwik <lc...@google.com> wrote:

> Niels, it is interesting to see that in the Hadoop community that reading
> a gzip file through from the beginning to the split point is worth it for
> some real user scenarios.
>
> [image: image.png]
>
>
> *From: *Robert Bradshaw <ro...@google.com>
> *Date: *Wed, May 15, 2019 at 5:52 AM
> *To: * <us...@beam.apache.org>
> *Cc: *dev
>
> Interesting thread. Thanks for digging that up.
>>
>> I would try the shuffle_mode=service experiment (forgot that wasn't
>> yet the default). If that doesn't do the trick, though avro as a
>> materialization format does not provide perfect parallelism, it should
>> be significantly better than what you have now (large gzip files) and
>> may be good enough.
>>
>> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
>> wrote:
>> >
>> > @Robert
>> >
>> > Does your suggestion imply, that the points made by Eugene on BEAM-2803
>> do not apply (anymore) and the combined reshuffle could just be omitted?
>> >
>> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
>> wrote:
>> >>
>> >> Unfortunately the "write" portion of the reshuffle cannot be
>> >> parallelized more than the source that it's reading from. In my
>> >> experience, generally the read is the bottleneck in this case, but
>> >> it's possible (e.g. if the input compresses extremely well) that it is
>> >> the write that is slow (which you seem to indicate based on your
>> >> observation of the UI, right?).
>> >>
>> >> It could be that materializing to temporary files is cheaper than
>> >> materializing randomly to shuffle (especially on pre-portable Python).
>> >> In that case you could force a fusion break with a side input instead.
>> >> E.g.
>> >>
>> >> class FusionBreak(beam.PTransform):
>> >>     def expand(self, pcoll):
>> >>         # Create an empty PCollection that depends on pcoll.
>> >>         empty = pcoll | beam.FlatMap(lambda x: ())
>> >>         # Use this empty PCollection as a side input, which will force
>> >> a fusion break.
>> >>         return pcoll | beam.Map(lambda x, unused: x,
>> >> beam.pvalue.AsIterable(empty))
>> >>
>> >> which could be used in place of Reshard like
>> >>
>> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>> >>
>> >> You'll probably want to be sure to pass the use_fastavro experiment as
>> well.
>> >>
>> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>> >> >
>> >> > Hi
>> >> >
>> >> > This project is a completely different solution towards this
>> problem, but in the hadoop mapreduce context.
>> >> >
>> >> > https://github.com/nielsbasjes/splittablegzip
>> >> >
>> >> >
>> >> > I have used this a lot in the past.
>> >> > Perhaps porting this project to beam is an option?
>> >> >
>> >> > Niels Basjes
>> >> >
>> >> >
>> >> >
>> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>> >> >>
>> >> >> Sorry I couldn't be more helpful.
>> >> >>
>> >> >> From: Allie Chen <yi...@google.com>
>> >> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> >> To: <de...@beam.apache.org>
>> >> >> Cc: user
>> >> >>
>> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
>> option for us.
>> >> >>>
>> >> >>>
>> >> >>> I am trying to speed up Reshuffle step, since it waits for all
>> data. Here are two ways I have tried:
>> >> >>>
>> >> >>> 1.  add timestamps to the PCollection's elements after reading
>> (since it is bounded source), then apply windowing before Reshuffle, but it
>> still waits all data.
>> >> >>>
>> >> >>>
>> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
>> error: Workflow failed. Causes: Expected custom source to have non-zero
>> number of splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>> >> >>>
>> >> >>> DataflowRunner does not currently support the following Cloud
>> Dataflow specific features with Python streaming execution.
>> >> >>>
>> >> >>> Streaming autoscaling
>> >> >>>
>> >> >>> I doubt whether this approach can solve my issue.
>> >> >>>
>> >> >>>
>> >> >>> Thanks so much!
>> >> >>>
>> >> >>> Allie
>> >> >>>
>> >> >>>
>> >> >>> From: Lukasz Cwik <lc...@google.com>
>> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >> >>> To: dev
>> >> >>> Cc: user
>> >> >>>
>> >> >>>> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> >> >>>> If not, you could structure your pipeline
>> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> >> >>>> and then run it as a batch pipeline.
>> >> >>>>
>> >> >>>> You can set --streaming=true on the pipeline and then it will run
>> in a streaming mode but streaming prioritizes low latency and correctness
>> on Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>> >> >>>>
>> >> >>>> From: Allie Chen <yi...@google.com>
>> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>> >> >>>> To: <de...@beam.apache.org>
>> >> >>>> Cc: user
>> >> >>>>
>> >> >>>>> Is it possible to use windowing or somehow pretend it is
>> streaming so Reshuffle or GroupByKey won't wait until all data has been
>> read?
>> >> >>>>>
>> >> >>>>> Thanks!
>> >> >>>>> Allie
>> >> >>>>>
>> >> >>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>> >> >>>>> To: dev
>> >> >>>>> Cc: user
>> >> >>>>>
>> >> >>>>>> There is no such flag to turn of fusion.
>> >> >>>>>>
>> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will
>> take time when it is limited to a small number of workers.
>> >> >>>>>>
>> >> >>>>>> If you can split up your input into a lot of smaller files that
>> are compressed then you shouldn't need to use the reshuffle but still could
>> if you found it helped.
>> >> >>>>>>
>> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>
>> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
>> format but I will see whether the splitting gzip files will work. Is there
>> a simple flag in Dataflow that could turn off the fusion?
>> >> >>>>>>>
>> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
>> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
>> Reshuffle itself is not parallel either.
>> >> >>>>>>>
>> >> >>>>>>> Thanks all,
>> >> >>>>>>>
>> >> >>>>>>> Allie
>> >> >>>>>>>
>> >> >>>>>>> From: Reuven Lax <re...@google.com>
>> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>> >> >>>>>>> To: dev
>> >> >>>>>>> Cc: user
>> >> >>>>>>>
>> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
>> likely that simply reading and decompressing all that data was very slow
>> when there was no parallelism.
>> >> >>>>>>>>
>> >> >>>>>>>> From: Allie Chen <yi...@google.com>
>> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>> >> >>>>>>>> To: <de...@beam.apache.org>
>> >> >>>>>>>> Cc: <us...@beam.apache.org>
>> >> >>>>>>>>
>> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
>> parallel. But Reshuffle transform itself takes hours or even days to run,
>> according to one test (24 gzip files, 17 million lines in total) I did.
>> >> >>>>>>>>>
>> >> >>>>>>>>> The file format for our users are mostly gzip format, since
>> uncompressed files would be too costly to store (It could be in hundreds of
>> GB).
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>>
>> >> >>>>>>>>> Allie
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
>> >> >>>>>>>>>
>> >> >>>>>>>>>> +user@beam.apache.org
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
>> waits till all the data has been read before the next transforms can run.
>> After the reshuffle, the data should have been processed in parallel across
>> the workers. Did you see this?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an
>> uncompressed file or many compressed files?
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
>> yifangchen@google.com> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Hi,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
>> Since the compressed file is not splittable, one worker is allocated to
>> read the file. The same worker will do all the other transforms since
>> Dataflow fused all transforms together.  There are a large amount of data
>> in the file, and I expect to see more workers spinning up after reading
>> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
>> it is not scalable since it won’t proceed until all data arrived at this
>> point.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
>> all the other transforms after reading?
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Thanks,
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> Allie
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>>
>>
>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
Niels, it is interesting to see that in the Hadoop community that reading a
gzip file through from the beginning to the split point is worth it for
some real user scenarios.

[image: image.png]


*From: *Robert Bradshaw <ro...@google.com>
*Date: *Wed, May 15, 2019 at 5:52 AM
*To: * <us...@beam.apache.org>
*Cc: *dev

Interesting thread. Thanks for digging that up.
>
> I would try the shuffle_mode=service experiment (forgot that wasn't
> yet the default). If that doesn't do the trick, though avro as a
> materialization format does not provide perfect parallelism, it should
> be significantly better than what you have now (large gzip files) and
> may be good enough.
>
> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
> wrote:
> >
> > @Robert
> >
> > Does your suggestion imply, that the points made by Eugene on BEAM-2803
> do not apply (anymore) and the combined reshuffle could just be omitted?
> >
> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Unfortunately the "write" portion of the reshuffle cannot be
> >> parallelized more than the source that it's reading from. In my
> >> experience, generally the read is the bottleneck in this case, but
> >> it's possible (e.g. if the input compresses extremely well) that it is
> >> the write that is slow (which you seem to indicate based on your
> >> observation of the UI, right?).
> >>
> >> It could be that materializing to temporary files is cheaper than
> >> materializing randomly to shuffle (especially on pre-portable Python).
> >> In that case you could force a fusion break with a side input instead.
> >> E.g.
> >>
> >> class FusionBreak(beam.PTransform):
> >>     def expand(self, pcoll):
> >>         # Create an empty PCollection that depends on pcoll.
> >>         empty = pcoll | beam.FlatMap(lambda x: ())
> >>         # Use this empty PCollection as a side input, which will force
> >> a fusion break.
> >>         return pcoll | beam.Map(lambda x, unused: x,
> >> beam.pvalue.AsIterable(empty))
> >>
> >> which could be used in place of Reshard like
> >>
> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
> >>
> >> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
> >>
> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
> >> >
> >> > Hi
> >> >
> >> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >> >
> >> > https://github.com/nielsbasjes/splittablegzip
> >> >
> >> >
> >> > I have used this a lot in the past.
> >> > Perhaps porting this project to beam is an option?
> >> >
> >> > Niels Basjes
> >> >
> >> >
> >> >
> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
> >> >>
> >> >> Sorry I couldn't be more helpful.
> >> >>
> >> >> From: Allie Chen <yi...@google.com>
> >> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> >> To: <de...@beam.apache.org>
> >> >> Cc: user
> >> >>
> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
> option for us.
> >> >>>
> >> >>>
> >> >>> I am trying to speed up Reshuffle step, since it waits for all
> data. Here are two ways I have tried:
> >> >>>
> >> >>> 1.  add timestamps to the PCollection's elements after reading
> (since it is bounded source), then apply windowing before Reshuffle, but it
> still waits all data.
> >> >>>
> >> >>>
> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
> error: Workflow failed. Causes: Expected custom source to have non-zero
> number of splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >> >>>
> >> >>> DataflowRunner does not currently support the following Cloud
> Dataflow specific features with Python streaming execution.
> >> >>>
> >> >>> Streaming autoscaling
> >> >>>
> >> >>> I doubt whether this approach can solve my issue.
> >> >>>
> >> >>>
> >> >>> Thanks so much!
> >> >>>
> >> >>> Allie
> >> >>>
> >> >>>
> >> >>> From: Lukasz Cwik <lc...@google.com>
> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >> >>> To: dev
> >> >>> Cc: user
> >> >>>
> >> >>>> Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> >> >>>> If not, you could structure your pipeline
> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> >> >>>> and then run it as a batch pipeline.
> >> >>>>
> >> >>>> You can set --streaming=true on the pipeline and then it will run
> in a streaming mode but streaming prioritizes low latency and correctness
> on Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> >> >>>>
> >> >>>> From: Allie Chen <yi...@google.com>
> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
> >> >>>> To: <de...@beam.apache.org>
> >> >>>> Cc: user
> >> >>>>
> >> >>>>> Is it possible to use windowing or somehow pretend it is
> streaming so Reshuffle or GroupByKey won't wait until all data has been
> read?
> >> >>>>>
> >> >>>>> Thanks!
> >> >>>>> Allie
> >> >>>>>
> >> >>>>> From: Lukasz Cwik <lc...@google.com>
> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
> >> >>>>> To: dev
> >> >>>>> Cc: user
> >> >>>>>
> >> >>>>>> There is no such flag to turn of fusion.
> >> >>>>>>
> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >> >>>>>>
> >> >>>>>> If you can split up your input into a lot of smaller files that
> are compressed then you shouldn't need to use the reshuffle but still could
> if you found it helped.
> >> >>>>>>
> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
> yifangchen@google.com> wrote:
> >> >>>>>>>
> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
> format but I will see whether the splitting gzip files will work. Is there
> a simple flag in Dataflow that could turn off the fusion?
> >> >>>>>>>
> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
> Reshuffle itself is not parallel either.
> >> >>>>>>>
> >> >>>>>>> Thanks all,
> >> >>>>>>>
> >> >>>>>>> Allie
> >> >>>>>>>
> >> >>>>>>> From: Reuven Lax <re...@google.com>
> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
> >> >>>>>>> To: dev
> >> >>>>>>> Cc: user
> >> >>>>>>>
> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
> likely that simply reading and decompressing all that data was very slow
> when there was no parallelism.
> >> >>>>>>>>
> >> >>>>>>>> From: Allie Chen <yi...@google.com>
> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
> >> >>>>>>>> To: <de...@beam.apache.org>
> >> >>>>>>>> Cc: <us...@beam.apache.org>
> >> >>>>>>>>
> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
> parallel. But Reshuffle transform itself takes hours or even days to run,
> according to one test (24 gzip files, 17 million lines in total) I did.
> >> >>>>>>>>>
> >> >>>>>>>>> The file format for our users are mostly gzip format, since
> uncompressed files would be too costly to store (It could be in hundreds of
> GB).
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks,
> >> >>>>>>>>>
> >> >>>>>>>>> Allie
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
> >> >>>>>>>>>
> >> >>>>>>>>>> +user@beam.apache.org
> >> >>>>>>>>>>
> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
> waits till all the data has been read before the next transforms can run.
> After the reshuffle, the data should have been processed in parallel across
> the workers. Did you see this?
> >> >>>>>>>>>>
> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
> yifangchen@google.com> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Hi,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
> Since the compressed file is not splittable, one worker is allocated to
> read the file. The same worker will do all the other transforms since
> Dataflow fused all transforms together.  There are a large amount of data
> in the file, and I expect to see more workers spinning up after reading
> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
> it is not scalable since it won’t proceed until all data arrived at this
> point.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
> all the other transforms after reading?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Allie
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
Niels, it is interesting to see that in the Hadoop community that reading a
gzip file through from the beginning to the split point is worth it for
some real user scenarios.

[image: image.png]


*From: *Robert Bradshaw <ro...@google.com>
*Date: *Wed, May 15, 2019 at 5:52 AM
*To: * <us...@beam.apache.org>
*Cc: *dev

Interesting thread. Thanks for digging that up.
>
> I would try the shuffle_mode=service experiment (forgot that wasn't
> yet the default). If that doesn't do the trick, though avro as a
> materialization format does not provide perfect parallelism, it should
> be significantly better than what you have now (large gzip files) and
> may be good enough.
>
> On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com>
> wrote:
> >
> > @Robert
> >
> > Does your suggestion imply, that the points made by Eugene on BEAM-2803
> do not apply (anymore) and the combined reshuffle could just be omitted?
> >
> > On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com>
> wrote:
> >>
> >> Unfortunately the "write" portion of the reshuffle cannot be
> >> parallelized more than the source that it's reading from. In my
> >> experience, generally the read is the bottleneck in this case, but
> >> it's possible (e.g. if the input compresses extremely well) that it is
> >> the write that is slow (which you seem to indicate based on your
> >> observation of the UI, right?).
> >>
> >> It could be that materializing to temporary files is cheaper than
> >> materializing randomly to shuffle (especially on pre-portable Python).
> >> In that case you could force a fusion break with a side input instead.
> >> E.g.
> >>
> >> class FusionBreak(beam.PTransform):
> >>     def expand(self, pcoll):
> >>         # Create an empty PCollection that depends on pcoll.
> >>         empty = pcoll | beam.FlatMap(lambda x: ())
> >>         # Use this empty PCollection as a side input, which will force
> >> a fusion break.
> >>         return pcoll | beam.Map(lambda x, unused: x,
> >> beam.pvalue.AsIterable(empty))
> >>
> >> which could be used in place of Reshard like
> >>
> >>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
> >>
> >> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
> >>
> >> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
> >> >
> >> > Hi
> >> >
> >> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >> >
> >> > https://github.com/nielsbasjes/splittablegzip
> >> >
> >> >
> >> > I have used this a lot in the past.
> >> > Perhaps porting this project to beam is an option?
> >> >
> >> > Niels Basjes
> >> >
> >> >
> >> >
> >> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
> >> >>
> >> >> Sorry I couldn't be more helpful.
> >> >>
> >> >> From: Allie Chen <yi...@google.com>
> >> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> >> To: <de...@beam.apache.org>
> >> >> Cc: user
> >> >>
> >> >>> Thank Lukasz. Unfortunately, decompressing the files is not an
> option for us.
> >> >>>
> >> >>>
> >> >>> I am trying to speed up Reshuffle step, since it waits for all
> data. Here are two ways I have tried:
> >> >>>
> >> >>> 1.  add timestamps to the PCollection's elements after reading
> (since it is bounded source), then apply windowing before Reshuffle, but it
> still waits all data.
> >> >>>
> >> >>>
> >> >>> 2.  run the pipeline with --streaming flag, but it leads to an
> error: Workflow failed. Causes: Expected custom source to have non-zero
> number of splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >> >>>
> >> >>> DataflowRunner does not currently support the following Cloud
> Dataflow specific features with Python streaming execution.
> >> >>>
> >> >>> Streaming autoscaling
> >> >>>
> >> >>> I doubt whether this approach can solve my issue.
> >> >>>
> >> >>>
> >> >>> Thanks so much!
> >> >>>
> >> >>> Allie
> >> >>>
> >> >>>
> >> >>> From: Lukasz Cwik <lc...@google.com>
> >> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >> >>> To: dev
> >> >>> Cc: user
> >> >>>
> >> >>>> Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> >> >>>> If not, you could structure your pipeline
> >> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> >> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> >> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> >> >>>> and then run it as a batch pipeline.
> >> >>>>
> >> >>>> You can set --streaming=true on the pipeline and then it will run
> in a streaming mode but streaming prioritizes low latency and correctness
> on Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> >> >>>>
> >> >>>> From: Allie Chen <yi...@google.com>
> >> >>>> Date: Tue, May 14, 2019 at 7:38 AM
> >> >>>> To: <de...@beam.apache.org>
> >> >>>> Cc: user
> >> >>>>
> >> >>>>> Is it possible to use windowing or somehow pretend it is
> streaming so Reshuffle or GroupByKey won't wait until all data has been
> read?
> >> >>>>>
> >> >>>>> Thanks!
> >> >>>>> Allie
> >> >>>>>
> >> >>>>> From: Lukasz Cwik <lc...@google.com>
> >> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
> >> >>>>> To: dev
> >> >>>>> Cc: user
> >> >>>>>
> >> >>>>>> There is no such flag to turn of fusion.
> >> >>>>>>
> >> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >> >>>>>>
> >> >>>>>> If you can split up your input into a lot of smaller files that
> are compressed then you shouldn't need to use the reshuffle but still could
> if you found it helped.
> >> >>>>>>
> >> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <
> yifangchen@google.com> wrote:
> >> >>>>>>>
> >> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression
> format but I will see whether the splitting gzip files will work. Is there
> a simple flag in Dataflow that could turn off the fusion?
> >> >>>>>>>
> >> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
> Reshuffle itself is not parallel either.
> >> >>>>>>>
> >> >>>>>>> Thanks all,
> >> >>>>>>>
> >> >>>>>>> Allie
> >> >>>>>>>
> >> >>>>>>> From: Reuven Lax <re...@google.com>
> >> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
> >> >>>>>>> To: dev
> >> >>>>>>> Cc: user
> >> >>>>>>>
> >> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more
> likely that simply reading and decompressing all that data was very slow
> when there was no parallelism.
> >> >>>>>>>>
> >> >>>>>>>> From: Allie Chen <yi...@google.com>
> >> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
> >> >>>>>>>> To: <de...@beam.apache.org>
> >> >>>>>>>> Cc: <us...@beam.apache.org>
> >> >>>>>>>>
> >> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
> parallel. But Reshuffle transform itself takes hours or even days to run,
> according to one test (24 gzip files, 17 million lines in total) I did.
> >> >>>>>>>>>
> >> >>>>>>>>> The file format for our users are mostly gzip format, since
> uncompressed files would be too costly to store (It could be in hundreds of
> GB).
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks,
> >> >>>>>>>>>
> >> >>>>>>>>> Allie
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
> >> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
> >> >>>>>>>>> To: dev, <us...@beam.apache.org>
> >> >>>>>>>>>
> >> >>>>>>>>>> +user@beam.apache.org
> >> >>>>>>>>>>
> >> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline
> waits till all the data has been read before the next transforms can run.
> After the reshuffle, the data should have been processed in parallel across
> the workers. Did you see this?
> >> >>>>>>>>>>
> >> >>>>>>>>>> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
> >> >>>>>>>>>>
> >> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
> yifangchen@google.com> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Hi,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
> Since the compressed file is not splittable, one worker is allocated to
> read the file. The same worker will do all the other transforms since
> Dataflow fused all transforms together.  There are a large amount of data
> in the file, and I expect to see more workers spinning up after reading
> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
> it is not scalable since it won’t proceed until all data arrived at this
> point.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Is there any other ways to allow more workers working on
> all the other transforms after reading?
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Thanks,
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> Allie
> >> >>>>>>>>>>>
> >> >>>>>>>>>>>
>

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
Interesting thread. Thanks for digging that up.

I would try the shuffle_mode=service experiment (forgot that wasn't
yet the default). If that doesn't do the trick, though avro as a
materialization format does not provide perfect parallelism, it should
be significantly better than what you have now (large gzip files) and
may be good enough.

On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com> wrote:
>
> @Robert
>
> Does your suggestion imply, that the points made by Eugene on BEAM-2803 do not apply (anymore) and the combined reshuffle could just be omitted?
>
> On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Unfortunately the "write" portion of the reshuffle cannot be
>> parallelized more than the source that it's reading from. In my
>> experience, generally the read is the bottleneck in this case, but
>> it's possible (e.g. if the input compresses extremely well) that it is
>> the write that is slow (which you seem to indicate based on your
>> observation of the UI, right?).
>>
>> It could be that materializing to temporary files is cheaper than
>> materializing randomly to shuffle (especially on pre-portable Python).
>> In that case you could force a fusion break with a side input instead.
>> E.g.
>>
>> class FusionBreak(beam.PTransform):
>>     def expand(self, pcoll):
>>         # Create an empty PCollection that depends on pcoll.
>>         empty = pcoll | beam.FlatMap(lambda x: ())
>>         # Use this empty PCollection as a side input, which will force
>> a fusion break.
>>         return pcoll | beam.Map(lambda x, unused: x,
>> beam.pvalue.AsIterable(empty))
>>
>> which could be used in place of Reshard like
>>
>>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>
>> You'll probably want to be sure to pass the use_fastavro experiment as well.
>>
>> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>> >
>> > Hi
>> >
>> > This project is a completely different solution towards this problem, but in the hadoop mapreduce context.
>> >
>> > https://github.com/nielsbasjes/splittablegzip
>> >
>> >
>> > I have used this a lot in the past.
>> > Perhaps porting this project to beam is an option?
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>> >>
>> >> Sorry I couldn't be more helpful.
>> >>
>> >> From: Allie Chen <yi...@google.com>
>> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> To: <de...@beam.apache.org>
>> >> Cc: user
>> >>
>> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option for us.
>> >>>
>> >>>
>> >>> I am trying to speed up Reshuffle step, since it waits for all data. Here are two ways I have tried:
>> >>>
>> >>> 1.  add timestamps to the PCollection's elements after reading (since it is bounded source), then apply windowing before Reshuffle, but it still waits all data.
>> >>>
>> >>>
>> >>> 2.  run the pipeline with --streaming flag, but it leads to an error: Workflow failed. Causes: Expected custom source to have non-zero number of splits. Also, I found in https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>> >>>
>> >>> DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.
>> >>>
>> >>> Streaming autoscaling
>> >>>
>> >>> I doubt whether this approach can solve my issue.
>> >>>
>> >>>
>> >>> Thanks so much!
>> >>>
>> >>> Allie
>> >>>
>> >>>
>> >>> From: Lukasz Cwik <lc...@google.com>
>> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >>> To: dev
>> >>> Cc: user
>> >>>
>> >>>> Do you need to perform any joins across the files (e.g. Combine.perKey/GroupByKey/...)?
>> >>>> If not, you could structure your pipeline
>> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> >>>> and then run it as a batch pipeline.
>> >>>>
>> >>>> You can set --streaming=true on the pipeline and then it will run in a streaming mode but streaming prioritizes low latency and correctness on Google Cloud Dataflow so it will cost more to run your pipeline then in batch mode. It may make more sense to store the data uncompressed as it may be less expensive then paying the additional compute cost for streaming.
>> >>>>
>> >>>> From: Allie Chen <yi...@google.com>
>> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>> >>>> To: <de...@beam.apache.org>
>> >>>> Cc: user
>> >>>>
>> >>>>> Is it possible to use windowing or somehow pretend it is streaming so Reshuffle or GroupByKey won't wait until all data has been read?
>> >>>>>
>> >>>>> Thanks!
>> >>>>> Allie
>> >>>>>
>> >>>>> From: Lukasz Cwik <lc...@google.com>
>> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>> >>>>> To: dev
>> >>>>> Cc: user
>> >>>>>
>> >>>>>> There is no such flag to turn of fusion.
>> >>>>>>
>> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time when it is limited to a small number of workers.
>> >>>>>>
>> >>>>>> If you can split up your input into a lot of smaller files that are compressed then you shouldn't need to use the reshuffle but still could if you found it helped.
>> >>>>>>
>> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>> >>>>>>>
>> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format but I will see whether the splitting gzip files will work. Is there a simple flag in Dataflow that could turn off the fusion?
>> >>>>>>>
>> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself is not parallel either.
>> >>>>>>>
>> >>>>>>> Thanks all,
>> >>>>>>>
>> >>>>>>> Allie
>> >>>>>>>
>> >>>>>>> From: Reuven Lax <re...@google.com>
>> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>> >>>>>>> To: dev
>> >>>>>>> Cc: user
>> >>>>>>>
>> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that simply reading and decompressing all that data was very slow when there was no parallelism.
>> >>>>>>>>
>> >>>>>>>> From: Allie Chen <yi...@google.com>
>> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>> >>>>>>>> To: <de...@beam.apache.org>
>> >>>>>>>> Cc: <us...@beam.apache.org>
>> >>>>>>>>
>> >>>>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But Reshuffle transform itself takes hours or even days to run, according to one test (24 gzip files, 17 million lines in total) I did.
>> >>>>>>>>>
>> >>>>>>>>> The file format for our users are mostly gzip format, since uncompressed files would be too costly to store (It could be in hundreds of GB).
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>>
>> >>>>>>>>> Allie
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>> >>>>>>>>> To: dev, <us...@beam.apache.org>
>> >>>>>>>>>
>> >>>>>>>>>> +user@beam.apache.org
>> >>>>>>>>>>
>> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all the data has been read before the next transforms can run. After the reshuffle, the data should have been processed in parallel across the workers. Did you see this?
>> >>>>>>>>>>
>> >>>>>>>>>> Are you able to change the input of your pipeline to use an uncompressed file or many compressed files?
>> >>>>>>>>>>
>> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Hi,
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the compressed file is not splittable, one worker is allocated to read the file. The same worker will do all the other transforms since Dataflow fused all transforms together.  There are a large amount of data in the file, and I expect to see more workers spinning up after reading transforms. I tried to use Reshuffle Transform to prevent the fusion, but it is not scalable since it won’t proceed until all data arrived at this point.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Is there any other ways to allow more workers working on all the other transforms after reading?
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks,
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Allie
>> >>>>>>>>>>>
>> >>>>>>>>>>>

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
Interesting thread. Thanks for digging that up.

I would try the shuffle_mode=service experiment (forgot that wasn't
yet the default). If that doesn't do the trick, though avro as a
materialization format does not provide perfect parallelism, it should
be significantly better than what you have now (large gzip files) and
may be good enough.

On Wed, May 15, 2019 at 2:34 PM Michael Luckey <ad...@gmail.com> wrote:
>
> @Robert
>
> Does your suggestion imply, that the points made by Eugene on BEAM-2803 do not apply (anymore) and the combined reshuffle could just be omitted?
>
> On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com> wrote:
>>
>> Unfortunately the "write" portion of the reshuffle cannot be
>> parallelized more than the source that it's reading from. In my
>> experience, generally the read is the bottleneck in this case, but
>> it's possible (e.g. if the input compresses extremely well) that it is
>> the write that is slow (which you seem to indicate based on your
>> observation of the UI, right?).
>>
>> It could be that materializing to temporary files is cheaper than
>> materializing randomly to shuffle (especially on pre-portable Python).
>> In that case you could force a fusion break with a side input instead.
>> E.g.
>>
>> class FusionBreak(beam.PTransform):
>>     def expand(self, pcoll):
>>         # Create an empty PCollection that depends on pcoll.
>>         empty = pcoll | beam.FlatMap(lambda x: ())
>>         # Use this empty PCollection as a side input, which will force
>> a fusion break.
>>         return pcoll | beam.Map(lambda x, unused: x,
>> beam.pvalue.AsIterable(empty))
>>
>> which could be used in place of Reshard like
>>
>>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>>
>> You'll probably want to be sure to pass the use_fastavro experiment as well.
>>
>> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>> >
>> > Hi
>> >
>> > This project is a completely different solution towards this problem, but in the hadoop mapreduce context.
>> >
>> > https://github.com/nielsbasjes/splittablegzip
>> >
>> >
>> > I have used this a lot in the past.
>> > Perhaps porting this project to beam is an option?
>> >
>> > Niels Basjes
>> >
>> >
>> >
>> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>> >>
>> >> Sorry I couldn't be more helpful.
>> >>
>> >> From: Allie Chen <yi...@google.com>
>> >> Date: Tue, May 14, 2019 at 10:09 AM
>> >> To: <de...@beam.apache.org>
>> >> Cc: user
>> >>
>> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option for us.
>> >>>
>> >>>
>> >>> I am trying to speed up Reshuffle step, since it waits for all data. Here are two ways I have tried:
>> >>>
>> >>> 1.  add timestamps to the PCollection's elements after reading (since it is bounded source), then apply windowing before Reshuffle, but it still waits all data.
>> >>>
>> >>>
>> >>> 2.  run the pipeline with --streaming flag, but it leads to an error: Workflow failed. Causes: Expected custom source to have non-zero number of splits. Also, I found in https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>> >>>
>> >>> DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.
>> >>>
>> >>> Streaming autoscaling
>> >>>
>> >>> I doubt whether this approach can solve my issue.
>> >>>
>> >>>
>> >>> Thanks so much!
>> >>>
>> >>> Allie
>> >>>
>> >>>
>> >>> From: Lukasz Cwik <lc...@google.com>
>> >>> Date: Tue, May 14, 2019 at 11:16 AM
>> >>> To: dev
>> >>> Cc: user
>> >>>
>> >>>> Do you need to perform any joins across the files (e.g. Combine.perKey/GroupByKey/...)?
>> >>>> If not, you could structure your pipeline
>> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> >>>> and then run it as a batch pipeline.
>> >>>>
>> >>>> You can set --streaming=true on the pipeline and then it will run in a streaming mode but streaming prioritizes low latency and correctness on Google Cloud Dataflow so it will cost more to run your pipeline then in batch mode. It may make more sense to store the data uncompressed as it may be less expensive then paying the additional compute cost for streaming.
>> >>>>
>> >>>> From: Allie Chen <yi...@google.com>
>> >>>> Date: Tue, May 14, 2019 at 7:38 AM
>> >>>> To: <de...@beam.apache.org>
>> >>>> Cc: user
>> >>>>
>> >>>>> Is it possible to use windowing or somehow pretend it is streaming so Reshuffle or GroupByKey won't wait until all data has been read?
>> >>>>>
>> >>>>> Thanks!
>> >>>>> Allie
>> >>>>>
>> >>>>> From: Lukasz Cwik <lc...@google.com>
>> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
>> >>>>> To: dev
>> >>>>> Cc: user
>> >>>>>
>> >>>>>> There is no such flag to turn of fusion.
>> >>>>>>
>> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time when it is limited to a small number of workers.
>> >>>>>>
>> >>>>>> If you can split up your input into a lot of smaller files that are compressed then you shouldn't need to use the reshuffle but still could if you found it helped.
>> >>>>>>
>> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>> >>>>>>>
>> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format but I will see whether the splitting gzip files will work. Is there a simple flag in Dataflow that could turn off the fusion?
>> >>>>>>>
>> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself is not parallel either.
>> >>>>>>>
>> >>>>>>> Thanks all,
>> >>>>>>>
>> >>>>>>> Allie
>> >>>>>>>
>> >>>>>>> From: Reuven Lax <re...@google.com>
>> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>> >>>>>>> To: dev
>> >>>>>>> Cc: user
>> >>>>>>>
>> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that simply reading and decompressing all that data was very slow when there was no parallelism.
>> >>>>>>>>
>> >>>>>>>> From: Allie Chen <yi...@google.com>
>> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>> >>>>>>>> To: <de...@beam.apache.org>
>> >>>>>>>> Cc: <us...@beam.apache.org>
>> >>>>>>>>
>> >>>>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But Reshuffle transform itself takes hours or even days to run, according to one test (24 gzip files, 17 million lines in total) I did.
>> >>>>>>>>>
>> >>>>>>>>> The file format for our users are mostly gzip format, since uncompressed files would be too costly to store (It could be in hundreds of GB).
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>>
>> >>>>>>>>> Allie
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>> >>>>>>>>> To: dev, <us...@beam.apache.org>
>> >>>>>>>>>
>> >>>>>>>>>> +user@beam.apache.org
>> >>>>>>>>>>
>> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all the data has been read before the next transforms can run. After the reshuffle, the data should have been processed in parallel across the workers. Did you see this?
>> >>>>>>>>>>
>> >>>>>>>>>> Are you able to change the input of your pipeline to use an uncompressed file or many compressed files?
>> >>>>>>>>>>
>> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> Hi,
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the compressed file is not splittable, one worker is allocated to read the file. The same worker will do all the other transforms since Dataflow fused all transforms together.  There are a large amount of data in the file, and I expect to see more workers spinning up after reading transforms. I tried to use Reshuffle Transform to prevent the fusion, but it is not scalable since it won’t proceed until all data arrived at this point.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Is there any other ways to allow more workers working on all the other transforms after reading?
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Thanks,
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Allie
>> >>>>>>>>>>>
>> >>>>>>>>>>>

Re: Problem with gzip

Posted by Michael Luckey <ad...@gmail.com>.
@Robert

Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?

On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com> wrote:

> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than the source that it's reading from. In my
> experience, generally the read is the bottleneck in this case, but
> it's possible (e.g. if the input compresses extremely well) that it is
> the write that is slow (which you seem to indicate based on your
> observation of the UI, right?).
>
> It could be that materializing to temporary files is cheaper than
> materializing randomly to shuffle (especially on pre-portable Python).
> In that case you could force a fusion break with a side input instead.
> E.g.
>
> class FusionBreak(beam.PTransform):
>     def expand(self, pcoll):
>         # Create an empty PCollection that depends on pcoll.
>         empty = pcoll | beam.FlatMap(lambda x: ())
>         # Use this empty PCollection as a side input, which will force
> a fusion break.
>         return pcoll | beam.Map(lambda x, unused: x,
> beam.pvalue.AsIterable(empty))
>
> which could be used in place of Reshard like
>
>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>
> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
>
> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
> >
> > Hi
> >
> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >
> > https://github.com/nielsbasjes/splittablegzip
> >
> >
> > I have used this a lot in the past.
> > Perhaps porting this project to beam is an option?
> >
> > Niels Basjes
> >
> >
> >
> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> Sorry I couldn't be more helpful.
> >>
> >> From: Allie Chen <yi...@google.com>
> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> To: <de...@beam.apache.org>
> >> Cc: user
> >>
> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option
> for us.
> >>>
> >>>
> >>> I am trying to speed up Reshuffle step, since it waits for all data.
> Here are two ways I have tried:
> >>>
> >>> 1.  add timestamps to the PCollection's elements after reading (since
> it is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
> >>>
> >>>
> >>> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >>>
> >>> DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.
> >>>
> >>> Streaming autoscaling
> >>>
> >>> I doubt whether this approach can solve my issue.
> >>>
> >>>
> >>> Thanks so much!
> >>>
> >>> Allie
> >>>
> >>>
> >>> From: Lukasz Cwik <lc...@google.com>
> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >>> To: dev
> >>> Cc: user
> >>>
> >>>> Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> >>>> If not, you could structure your pipeline
> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> >>>> and then run it as a batch pipeline.
> >>>>
> >>>> You can set --streaming=true on the pipeline and then it will run in
> a streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> >>>>
> >>>> From: Allie Chen <yi...@google.com>
> >>>> Date: Tue, May 14, 2019 at 7:38 AM
> >>>> To: <de...@beam.apache.org>
> >>>> Cc: user
> >>>>
> >>>>> Is it possible to use windowing or somehow pretend it is streaming
> so Reshuffle or GroupByKey won't wait until all data has been read?
> >>>>>
> >>>>> Thanks!
> >>>>> Allie
> >>>>>
> >>>>> From: Lukasz Cwik <lc...@google.com>
> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
> >>>>> To: dev
> >>>>> Cc: user
> >>>>>
> >>>>>> There is no such flag to turn of fusion.
> >>>>>>
> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >>>>>>
> >>>>>> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
> >>>>>>
> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format
> but I will see whether the splitting gzip files will work. Is there a
> simple flag in Dataflow that could turn off the fusion?
> >>>>>>>
> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
> Reshuffle itself is not parallel either.
> >>>>>>>
> >>>>>>> Thanks all,
> >>>>>>>
> >>>>>>> Allie
> >>>>>>>
> >>>>>>> From: Reuven Lax <re...@google.com>
> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
> >>>>>>> To: dev
> >>>>>>> Cc: user
> >>>>>>>
> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely
> that simply reading and decompressing all that data was very slow when
> there was no parallelism.
> >>>>>>>>
> >>>>>>>> From: Allie Chen <yi...@google.com>
> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
> >>>>>>>> To: <de...@beam.apache.org>
> >>>>>>>> Cc: <us...@beam.apache.org>
> >>>>>>>>
> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
> parallel. But Reshuffle transform itself takes hours or even days to run,
> according to one test (24 gzip files, 17 million lines in total) I did.
> >>>>>>>>>
> >>>>>>>>> The file format for our users are mostly gzip format, since
> uncompressed files would be too costly to store (It could be in hundreds of
> GB).
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Allie
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
> >>>>>>>>> To: dev, <us...@beam.apache.org>
> >>>>>>>>>
> >>>>>>>>>> +user@beam.apache.org
> >>>>>>>>>>
> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
> till all the data has been read before the next transforms can run. After
> the reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
> >>>>>>>>>>
> >>>>>>>>>> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
> yifangchen@google.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
> Since the compressed file is not splittable, one worker is allocated to
> read the file. The same worker will do all the other transforms since
> Dataflow fused all transforms together.  There are a large amount of data
> in the file, and I expect to see more workers spinning up after reading
> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
> it is not scalable since it won’t proceed until all data arrived at this
> point.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Is there any other ways to allow more workers working on all
> the other transforms after reading?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Allie
> >>>>>>>>>>>
> >>>>>>>>>>>
>

Re: Problem with gzip

Posted by Michael Luckey <ad...@gmail.com>.
@Robert

Does your suggestion imply, that the points made by Eugene on BEAM-2803 do
not apply (anymore) and the combined reshuffle could just be omitted?

On Wed, May 15, 2019 at 1:00 PM Robert Bradshaw <ro...@google.com> wrote:

> Unfortunately the "write" portion of the reshuffle cannot be
> parallelized more than the source that it's reading from. In my
> experience, generally the read is the bottleneck in this case, but
> it's possible (e.g. if the input compresses extremely well) that it is
> the write that is slow (which you seem to indicate based on your
> observation of the UI, right?).
>
> It could be that materializing to temporary files is cheaper than
> materializing randomly to shuffle (especially on pre-portable Python).
> In that case you could force a fusion break with a side input instead.
> E.g.
>
> class FusionBreak(beam.PTransform):
>     def expand(self, pcoll):
>         # Create an empty PCollection that depends on pcoll.
>         empty = pcoll | beam.FlatMap(lambda x: ())
>         # Use this empty PCollection as a side input, which will force
> a fusion break.
>         return pcoll | beam.Map(lambda x, unused: x,
> beam.pvalue.AsIterable(empty))
>
> which could be used in place of Reshard like
>
>     p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...
>
> You'll probably want to be sure to pass the use_fastavro experiment as
> well.
>
> On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
> >
> > Hi
> >
> > This project is a completely different solution towards this problem,
> but in the hadoop mapreduce context.
> >
> > https://github.com/nielsbasjes/splittablegzip
> >
> >
> > I have used this a lot in the past.
> > Perhaps porting this project to beam is an option?
> >
> > Niels Basjes
> >
> >
> >
> > On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
> >>
> >> Sorry I couldn't be more helpful.
> >>
> >> From: Allie Chen <yi...@google.com>
> >> Date: Tue, May 14, 2019 at 10:09 AM
> >> To: <de...@beam.apache.org>
> >> Cc: user
> >>
> >>> Thank Lukasz. Unfortunately, decompressing the files is not an option
> for us.
> >>>
> >>>
> >>> I am trying to speed up Reshuffle step, since it waits for all data.
> Here are two ways I have tried:
> >>>
> >>> 1.  add timestamps to the PCollection's elements after reading (since
> it is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
> >>>
> >>>
> >>> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
> >>>
> >>> DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.
> >>>
> >>> Streaming autoscaling
> >>>
> >>> I doubt whether this approach can solve my issue.
> >>>
> >>>
> >>> Thanks so much!
> >>>
> >>> Allie
> >>>
> >>>
> >>> From: Lukasz Cwik <lc...@google.com>
> >>> Date: Tue, May 14, 2019 at 11:16 AM
> >>> To: dev
> >>> Cc: user
> >>>
> >>>> Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> >>>> If not, you could structure your pipeline
> >>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> >>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> >>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> >>>> and then run it as a batch pipeline.
> >>>>
> >>>> You can set --streaming=true on the pipeline and then it will run in
> a streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
> >>>>
> >>>> From: Allie Chen <yi...@google.com>
> >>>> Date: Tue, May 14, 2019 at 7:38 AM
> >>>> To: <de...@beam.apache.org>
> >>>> Cc: user
> >>>>
> >>>>> Is it possible to use windowing or somehow pretend it is streaming
> so Reshuffle or GroupByKey won't wait until all data has been read?
> >>>>>
> >>>>> Thanks!
> >>>>> Allie
> >>>>>
> >>>>> From: Lukasz Cwik <lc...@google.com>
> >>>>> Date: Fri, May 10, 2019 at 5:36 PM
> >>>>> To: dev
> >>>>> Cc: user
> >>>>>
> >>>>>> There is no such flag to turn of fusion.
> >>>>>>
> >>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take
> time when it is limited to a small number of workers.
> >>>>>>
> >>>>>> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
> >>>>>>
> >>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
> wrote:
> >>>>>>>
> >>>>>>> Re Lukasz: Thanks! I am not able to control the compression format
> but I will see whether the splitting gzip files will work. Is there a
> simple flag in Dataflow that could turn off the fusion?
> >>>>>>>
> >>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the
> GroupByKey and FlatMap in Reshuffle are very slow when the data is large.
> Reshuffle itself is not parallel either.
> >>>>>>>
> >>>>>>> Thanks all,
> >>>>>>>
> >>>>>>> Allie
> >>>>>>>
> >>>>>>> From: Reuven Lax <re...@google.com>
> >>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
> >>>>>>> To: dev
> >>>>>>> Cc: user
> >>>>>>>
> >>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely
> that simply reading and decompressing all that data was very slow when
> there was no parallelism.
> >>>>>>>>
> >>>>>>>> From: Allie Chen <yi...@google.com>
> >>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
> >>>>>>>> To: <de...@beam.apache.org>
> >>>>>>>> Cc: <us...@beam.apache.org>
> >>>>>>>>
> >>>>>>>>> Yes, I do see the data after reshuffle are processed in
> parallel. But Reshuffle transform itself takes hours or even days to run,
> according to one test (24 gzip files, 17 million lines in total) I did.
> >>>>>>>>>
> >>>>>>>>> The file format for our users are mostly gzip format, since
> uncompressed files would be too costly to store (It could be in hundreds of
> GB).
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Allie
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> From: Lukasz Cwik <lc...@google.com>
> >>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
> >>>>>>>>> To: dev, <us...@beam.apache.org>
> >>>>>>>>>
> >>>>>>>>>> +user@beam.apache.org
> >>>>>>>>>>
> >>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
> till all the data has been read before the next transforms can run. After
> the reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
> >>>>>>>>>>
> >>>>>>>>>> Are you able to change the input of your pipeline to use an
> uncompressed file or many compressed files?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <
> yifangchen@google.com> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> Hi,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow.
> Since the compressed file is not splittable, one worker is allocated to
> read the file. The same worker will do all the other transforms since
> Dataflow fused all transforms together.  There are a large amount of data
> in the file, and I expect to see more workers spinning up after reading
> transforms. I tried to use Reshuffle Transform to prevent the fusion, but
> it is not scalable since it won’t proceed until all data arrived at this
> point.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Is there any other ways to allow more workers working on all
> the other transforms after reading?
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Allie
> >>>>>>>>>>>
> >>>>>>>>>>>
>

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you seem to indicate based on your
observation of the UI, right?).

It could be that materializing to temporary files is cheaper than
materializing randomly to shuffle (especially on pre-portable Python).
In that case you could force a fusion break with a side input instead.
E.g.

class FusionBreak(beam.PTransform):
    def expand(self, pcoll):
        # Create an empty PCollection that depends on pcoll.
        empty = pcoll | beam.FlatMap(lambda x: ())
        # Use this empty PCollection as a side input, which will force
a fusion break.
        return pcoll | beam.Map(lambda x, unused: x,
beam.pvalue.AsIterable(empty))

which could be used in place of Reshard like

    p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...

You'll probably want to be sure to pass the use_fastavro experiment as well.

On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>
> Hi
>
> This project is a completely different solution towards this problem, but in the hadoop mapreduce context.
>
> https://github.com/nielsbasjes/splittablegzip
>
>
> I have used this a lot in the past.
> Perhaps porting this project to beam is an option?
>
> Niels Basjes
>
>
>
> On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>
>> Sorry I couldn't be more helpful.
>>
>> From: Allie Chen <yi...@google.com>
>> Date: Tue, May 14, 2019 at 10:09 AM
>> To: <de...@beam.apache.org>
>> Cc: user
>>
>>> Thank Lukasz. Unfortunately, decompressing the files is not an option for us.
>>>
>>>
>>> I am trying to speed up Reshuffle step, since it waits for all data. Here are two ways I have tried:
>>>
>>> 1.  add timestamps to the PCollection's elements after reading (since it is bounded source), then apply windowing before Reshuffle, but it still waits all data.
>>>
>>>
>>> 2.  run the pipeline with --streaming flag, but it leads to an error: Workflow failed. Causes: Expected custom source to have non-zero number of splits. Also, I found in https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>>>
>>> DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.
>>>
>>> Streaming autoscaling
>>>
>>> I doubt whether this approach can solve my issue.
>>>
>>>
>>> Thanks so much!
>>>
>>> Allie
>>>
>>>
>>> From: Lukasz Cwik <lc...@google.com>
>>> Date: Tue, May 14, 2019 at 11:16 AM
>>> To: dev
>>> Cc: user
>>>
>>>> Do you need to perform any joins across the files (e.g. Combine.perKey/GroupByKey/...)?
>>>> If not, you could structure your pipeline
>>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>>> and then run it as a batch pipeline.
>>>>
>>>> You can set --streaming=true on the pipeline and then it will run in a streaming mode but streaming prioritizes low latency and correctness on Google Cloud Dataflow so it will cost more to run your pipeline then in batch mode. It may make more sense to store the data uncompressed as it may be less expensive then paying the additional compute cost for streaming.
>>>>
>>>> From: Allie Chen <yi...@google.com>
>>>> Date: Tue, May 14, 2019 at 7:38 AM
>>>> To: <de...@beam.apache.org>
>>>> Cc: user
>>>>
>>>>> Is it possible to use windowing or somehow pretend it is streaming so Reshuffle or GroupByKey won't wait until all data has been read?
>>>>>
>>>>> Thanks!
>>>>> Allie
>>>>>
>>>>> From: Lukasz Cwik <lc...@google.com>
>>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>>>> To: dev
>>>>> Cc: user
>>>>>
>>>>>> There is no such flag to turn of fusion.
>>>>>>
>>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time when it is limited to a small number of workers.
>>>>>>
>>>>>> If you can split up your input into a lot of smaller files that are compressed then you shouldn't need to use the reshuffle but still could if you found it helped.
>>>>>>
>>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>>>>>>>
>>>>>>> Re Lukasz: Thanks! I am not able to control the compression format but I will see whether the splitting gzip files will work. Is there a simple flag in Dataflow that could turn off the fusion?
>>>>>>>
>>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself is not parallel either.
>>>>>>>
>>>>>>> Thanks all,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>> From: Reuven Lax <re...@google.com>
>>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>>>>>> To: dev
>>>>>>> Cc: user
>>>>>>>
>>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that simply reading and decompressing all that data was very slow when there was no parallelism.
>>>>>>>>
>>>>>>>> From: Allie Chen <yi...@google.com>
>>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>>>>>>> To: <de...@beam.apache.org>
>>>>>>>> Cc: <us...@beam.apache.org>
>>>>>>>>
>>>>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But Reshuffle transform itself takes hours or even days to run, according to one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>>>
>>>>>>>>> The file format for our users are mostly gzip format, since uncompressed files would be too costly to store (It could be in hundreds of GB).
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Allie
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>>>>>>>> To: dev, <us...@beam.apache.org>
>>>>>>>>>
>>>>>>>>>> +user@beam.apache.org
>>>>>>>>>>
>>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all the data has been read before the next transforms can run. After the reshuffle, the data should have been processed in parallel across the workers. Did you see this?
>>>>>>>>>>
>>>>>>>>>> Are you able to change the input of your pipeline to use an uncompressed file or many compressed files?
>>>>>>>>>>
>>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the compressed file is not splittable, one worker is allocated to read the file. The same worker will do all the other transforms since Dataflow fused all transforms together.  There are a large amount of data in the file, and I expect to see more workers spinning up after reading transforms. I tried to use Reshuffle Transform to prevent the fusion, but it is not scalable since it won’t proceed until all data arrived at this point.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Is there any other ways to allow more workers working on all the other transforms after reading?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Allie
>>>>>>>>>>>
>>>>>>>>>>>

Re: Problem with gzip

Posted by Robert Bradshaw <ro...@google.com>.
Unfortunately the "write" portion of the reshuffle cannot be
parallelized more than the source that it's reading from. In my
experience, generally the read is the bottleneck in this case, but
it's possible (e.g. if the input compresses extremely well) that it is
the write that is slow (which you seem to indicate based on your
observation of the UI, right?).

It could be that materializing to temporary files is cheaper than
materializing randomly to shuffle (especially on pre-portable Python).
In that case you could force a fusion break with a side input instead.
E.g.

class FusionBreak(beam.PTransform):
    def expand(self, pcoll):
        # Create an empty PCollection that depends on pcoll.
        empty = pcoll | beam.FlatMap(lambda x: ())
        # Use this empty PCollection as a side input, which will force
a fusion break.
        return pcoll | beam.Map(lambda x, unused: x,
beam.pvalue.AsIterable(empty))

which could be used in place of Reshard like

    p | beam.ReadFromGzipedFiles(...) | FusionBreak() | DoWork() ...

You'll probably want to be sure to pass the use_fastavro experiment as well.

On Wed, May 15, 2019 at 6:53 AM Niels Basjes <ni...@basj.es> wrote:
>
> Hi
>
> This project is a completely different solution towards this problem, but in the hadoop mapreduce context.
>
> https://github.com/nielsbasjes/splittablegzip
>
>
> I have used this a lot in the past.
> Perhaps porting this project to beam is an option?
>
> Niels Basjes
>
>
>
> On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:
>>
>> Sorry I couldn't be more helpful.
>>
>> From: Allie Chen <yi...@google.com>
>> Date: Tue, May 14, 2019 at 10:09 AM
>> To: <de...@beam.apache.org>
>> Cc: user
>>
>>> Thank Lukasz. Unfortunately, decompressing the files is not an option for us.
>>>
>>>
>>> I am trying to speed up Reshuffle step, since it waits for all data. Here are two ways I have tried:
>>>
>>> 1.  add timestamps to the PCollection's elements after reading (since it is bounded source), then apply windowing before Reshuffle, but it still waits all data.
>>>
>>>
>>> 2.  run the pipeline with --streaming flag, but it leads to an error: Workflow failed. Causes: Expected custom source to have non-zero number of splits. Also, I found in https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features:
>>>
>>> DataflowRunner does not currently support the following Cloud Dataflow specific features with Python streaming execution.
>>>
>>> Streaming autoscaling
>>>
>>> I doubt whether this approach can solve my issue.
>>>
>>>
>>> Thanks so much!
>>>
>>> Allie
>>>
>>>
>>> From: Lukasz Cwik <lc...@google.com>
>>> Date: Tue, May 14, 2019 at 11:16 AM
>>> To: dev
>>> Cc: user
>>>
>>>> Do you need to perform any joins across the files (e.g. Combine.perKey/GroupByKey/...)?
>>>> If not, you could structure your pipeline
>>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>>> and then run it as a batch pipeline.
>>>>
>>>> You can set --streaming=true on the pipeline and then it will run in a streaming mode but streaming prioritizes low latency and correctness on Google Cloud Dataflow so it will cost more to run your pipeline then in batch mode. It may make more sense to store the data uncompressed as it may be less expensive then paying the additional compute cost for streaming.
>>>>
>>>> From: Allie Chen <yi...@google.com>
>>>> Date: Tue, May 14, 2019 at 7:38 AM
>>>> To: <de...@beam.apache.org>
>>>> Cc: user
>>>>
>>>>> Is it possible to use windowing or somehow pretend it is streaming so Reshuffle or GroupByKey won't wait until all data has been read?
>>>>>
>>>>> Thanks!
>>>>> Allie
>>>>>
>>>>> From: Lukasz Cwik <lc...@google.com>
>>>>> Date: Fri, May 10, 2019 at 5:36 PM
>>>>> To: dev
>>>>> Cc: user
>>>>>
>>>>>> There is no such flag to turn of fusion.
>>>>>>
>>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time when it is limited to a small number of workers.
>>>>>>
>>>>>> If you can split up your input into a lot of smaller files that are compressed then you shouldn't need to use the reshuffle but still could if you found it helped.
>>>>>>
>>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>>>>>>>
>>>>>>> Re Lukasz: Thanks! I am not able to control the compression format but I will see whether the splitting gzip files will work. Is there a simple flag in Dataflow that could turn off the fusion?
>>>>>>>
>>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself is not parallel either.
>>>>>>>
>>>>>>> Thanks all,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>> From: Reuven Lax <re...@google.com>
>>>>>>> Date: Fri, May 10, 2019 at 5:02 PM
>>>>>>> To: dev
>>>>>>> Cc: user
>>>>>>>
>>>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that simply reading and decompressing all that data was very slow when there was no parallelism.
>>>>>>>>
>>>>>>>> From: Allie Chen <yi...@google.com>
>>>>>>>> Date: Fri, May 10, 2019 at 1:17 PM
>>>>>>>> To: <de...@beam.apache.org>
>>>>>>>> Cc: <us...@beam.apache.org>
>>>>>>>>
>>>>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But Reshuffle transform itself takes hours or even days to run, according to one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>>>
>>>>>>>>> The file format for our users are mostly gzip format, since uncompressed files would be too costly to store (It could be in hundreds of GB).
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Allie
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> From: Lukasz Cwik <lc...@google.com>
>>>>>>>>> Date: Fri, May 10, 2019 at 4:07 PM
>>>>>>>>> To: dev, <us...@beam.apache.org>
>>>>>>>>>
>>>>>>>>>> +user@beam.apache.org
>>>>>>>>>>
>>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all the data has been read before the next transforms can run. After the reshuffle, the data should have been processed in parallel across the workers. Did you see this?
>>>>>>>>>>
>>>>>>>>>> Are you able to change the input of your pipeline to use an uncompressed file or many compressed files?
>>>>>>>>>>
>>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the compressed file is not splittable, one worker is allocated to read the file. The same worker will do all the other transforms since Dataflow fused all transforms together.  There are a large amount of data in the file, and I expect to see more workers spinning up after reading transforms. I tried to use Reshuffle Transform to prevent the fusion, but it is not scalable since it won’t proceed until all data arrived at this point.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Is there any other ways to allow more workers working on all the other transforms after reading?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Allie
>>>>>>>>>>>
>>>>>>>>>>>

Re: Problem with gzip

Posted by Niels Basjes <ni...@basj.es>.
Hi

This project is a completely different solution towards this problem, but
in the hadoop mapreduce context.

https://github.com/nielsbasjes/splittablegzip


I have used this a lot in the past.
Perhaps porting this project to beam is an option?

Niels Basjes



On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:

> Sorry I couldn't be more helpful.
>
> *From: *Allie Chen <yi...@google.com>
> *Date: *Tue, May 14, 2019 at 10:09 AM
> *To: * <de...@beam.apache.org>
> *Cc: *user
>
> Thank Lukasz. Unfortunately, decompressing the files is not an option for
>> us.
>>
>>
>> I am trying to speed up Reshuffle step, since it waits for all data. Here
>> are two ways I have tried:
>>
>> 1.  add timestamps to the PCollection's elements after reading (since it
>> is bounded source), then apply windowing before Reshuffle, but it still
>> waits all data.
>>
>>
>> 2.  run the pipeline with --streaming flag, but it leads to an error:
>> Workflow failed. Causes: Expected custom source to have non-zero number of
>> splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>>
>> *DataflowRunner does not currently support the following Cloud Dataflow
>> specific features with Python streaming execution.*
>>
>>    -
>>
>>    *Streaming autoscaling*
>>
>> I doubt whether this approach can solve my issue.
>>
>>
>> Thanks so much!
>>
>> Allie
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Tue, May 14, 2019 at 11:16 AM
>> *To: *dev
>> *Cc: *user
>>
>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> If not, you could structure your pipeline
>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> and then run it as a batch pipeline.
>>>
>>> You can set --streaming=true on the pipeline and then it will run in a
>>> streaming mode but streaming prioritizes low latency and correctness on
>>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>>> batch mode. It may make more sense to store the data uncompressed as it may
>>> be less expensive then paying the additional compute cost for streaming.
>>>
>>> *From: *Allie Chen <yi...@google.com>
>>> *Date: *Tue, May 14, 2019 at 7:38 AM
>>> *To: * <de...@beam.apache.org>
>>> *Cc: *user
>>>
>>> Is it possible to use windowing or somehow pretend it is streaming so
>>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>>
>>>> Thanks!
>>>> Allie
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>>> *To: *dev
>>>> *Cc: *user
>>>>
>>>> There is no such flag to turn of fusion.
>>>>>
>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>>>> when it is limited to a small number of workers.
>>>>>
>>>>> If you can split up your input into a lot of smaller files that are
>>>>> compressed then you shouldn't need to use the reshuffle but still could if
>>>>> you found it helped.
>>>>>
>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Re Lukasz: Thanks! I am not able to control the compression format
>>>>>> but I will see whether the splitting gzip files will work. Is there a
>>>>>> simple flag in Dataflow that could turn off the fusion?
>>>>>>
>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>>>> itself is not parallel either.
>>>>>>
>>>>>> Thanks all,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>> *From: *Reuven Lax <re...@google.com>
>>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>>>> *To: *dev
>>>>>> *Cc: *user
>>>>>>
>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely
>>>>>>> that simply reading and decompressing all that data was very slow when
>>>>>>> there was no parallelism.
>>>>>>>
>>>>>>> *From: *Allie Chen <yi...@google.com>
>>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>>>> *To: * <de...@beam.apache.org>
>>>>>>> *Cc: * <us...@beam.apache.org>
>>>>>>>
>>>>>>> Yes, I do see the data after reshuffle are processed in parallel.
>>>>>>>> But Reshuffle transform itself takes hours or even days to run, according
>>>>>>>> to one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>>
>>>>>>>> The file format for our users are mostly gzip format, since
>>>>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>>>>> GB).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Allie
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>>>>
>>>>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>>>>
>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
>>>>>>>>> till all the data has been read before the next transforms can run. After
>>>>>>>>> the reshuffle, the data should have been processed in parallel across the
>>>>>>>>> workers. Did you see this?
>>>>>>>>>
>>>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>>
>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>>>> the compressed file is not splittable, one worker is allocated to read the
>>>>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>>>>> to use Reshuffle Transform
>>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>>>>> all data arrived at this point.
>>>>>>>>>>
>>>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>>>> other transforms after reading?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Allie
>>>>>>>>>>
>>>>>>>>>>

Re: Problem with gzip

Posted by Niels Basjes <ni...@basj.es>.
Hi

This project is a completely different solution towards this problem, but
in the hadoop mapreduce context.

https://github.com/nielsbasjes/splittablegzip


I have used this a lot in the past.
Perhaps porting this project to beam is an option?

Niels Basjes



On Tue, May 14, 2019, 20:45 Lukasz Cwik <lc...@google.com> wrote:

> Sorry I couldn't be more helpful.
>
> *From: *Allie Chen <yi...@google.com>
> *Date: *Tue, May 14, 2019 at 10:09 AM
> *To: * <de...@beam.apache.org>
> *Cc: *user
>
> Thank Lukasz. Unfortunately, decompressing the files is not an option for
>> us.
>>
>>
>> I am trying to speed up Reshuffle step, since it waits for all data. Here
>> are two ways I have tried:
>>
>> 1.  add timestamps to the PCollection's elements after reading (since it
>> is bounded source), then apply windowing before Reshuffle, but it still
>> waits all data.
>>
>>
>> 2.  run the pipeline with --streaming flag, but it leads to an error:
>> Workflow failed. Causes: Expected custom source to have non-zero number of
>> splits. Also, I found in
>> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
>> :
>>
>> *DataflowRunner does not currently support the following Cloud Dataflow
>> specific features with Python streaming execution.*
>>
>>    -
>>
>>    *Streaming autoscaling*
>>
>> I doubt whether this approach can solve my issue.
>>
>>
>> Thanks so much!
>>
>> Allie
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Tue, May 14, 2019 at 11:16 AM
>> *To: *dev
>> *Cc: *user
>>
>> Do you need to perform any joins across the files (e.g.
>>> Combine.perKey/GroupByKey/...)?
>>> If not, you could structure your pipeline
>>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>>> and then run it as a batch pipeline.
>>>
>>> You can set --streaming=true on the pipeline and then it will run in a
>>> streaming mode but streaming prioritizes low latency and correctness on
>>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>>> batch mode. It may make more sense to store the data uncompressed as it may
>>> be less expensive then paying the additional compute cost for streaming.
>>>
>>> *From: *Allie Chen <yi...@google.com>
>>> *Date: *Tue, May 14, 2019 at 7:38 AM
>>> *To: * <de...@beam.apache.org>
>>> *Cc: *user
>>>
>>> Is it possible to use windowing or somehow pretend it is streaming so
>>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>>
>>>> Thanks!
>>>> Allie
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>>> *To: *dev
>>>> *Cc: *user
>>>>
>>>> There is no such flag to turn of fusion.
>>>>>
>>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>>>> when it is limited to a small number of workers.
>>>>>
>>>>> If you can split up your input into a lot of smaller files that are
>>>>> compressed then you shouldn't need to use the reshuffle but still could if
>>>>> you found it helped.
>>>>>
>>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Re Lukasz: Thanks! I am not able to control the compression format
>>>>>> but I will see whether the splitting gzip files will work. Is there a
>>>>>> simple flag in Dataflow that could turn off the fusion?
>>>>>>
>>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>>>> itself is not parallel either.
>>>>>>
>>>>>> Thanks all,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>> *From: *Reuven Lax <re...@google.com>
>>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>>>> *To: *dev
>>>>>> *Cc: *user
>>>>>>
>>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely
>>>>>>> that simply reading and decompressing all that data was very slow when
>>>>>>> there was no parallelism.
>>>>>>>
>>>>>>> *From: *Allie Chen <yi...@google.com>
>>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>>>> *To: * <de...@beam.apache.org>
>>>>>>> *Cc: * <us...@beam.apache.org>
>>>>>>>
>>>>>>> Yes, I do see the data after reshuffle are processed in parallel.
>>>>>>>> But Reshuffle transform itself takes hours or even days to run, according
>>>>>>>> to one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>>
>>>>>>>> The file format for our users are mostly gzip format, since
>>>>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>>>>> GB).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Allie
>>>>>>>>
>>>>>>>>
>>>>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>>>>
>>>>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>>>>
>>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
>>>>>>>>> till all the data has been read before the next transforms can run. After
>>>>>>>>> the reshuffle, the data should have been processed in parallel across the
>>>>>>>>> workers. Did you see this?
>>>>>>>>>
>>>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>>
>>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>>>> the compressed file is not splittable, one worker is allocated to read the
>>>>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>>>>> to use Reshuffle Transform
>>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>>>>> all data arrived at this point.
>>>>>>>>>>
>>>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>>>> other transforms after reading?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Allie
>>>>>>>>>>
>>>>>>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
Sorry I couldn't be more helpful.

*From: *Allie Chen <yi...@google.com>
*Date: *Tue, May 14, 2019 at 10:09 AM
*To: * <de...@beam.apache.org>
*Cc: *user

Thank Lukasz. Unfortunately, decompressing the files is not an option for
> us.
>
>
> I am trying to speed up Reshuffle step, since it waits for all data. Here
> are two ways I have tried:
>
> 1.  add timestamps to the PCollection's elements after reading (since it
> is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
>
>
> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
>
> *DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.*
>
>    -
>
>    *Streaming autoscaling*
>
> I doubt whether this approach can solve my issue.
>
>
> Thanks so much!
>
> Allie
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Tue, May 14, 2019 at 11:16 AM
> *To: *dev
> *Cc: *user
>
> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> If not, you could structure your pipeline
>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> and then run it as a batch pipeline.
>>
>> You can set --streaming=true on the pipeline and then it will run in a
>> streaming mode but streaming prioritizes low latency and correctness on
>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>>
>> *From: *Allie Chen <yi...@google.com>
>> *Date: *Tue, May 14, 2019 at 7:38 AM
>> *To: * <de...@beam.apache.org>
>> *Cc: *user
>>
>> Is it possible to use windowing or somehow pretend it is streaming so
>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>
>>> Thanks!
>>> Allie
>>>
>>> *From: *Lukasz Cwik <lc...@google.com>
>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> There is no such flag to turn of fusion.
>>>>
>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>>> when it is limited to a small number of workers.
>>>>
>>>> If you can split up your input into a lot of smaller files that are
>>>> compressed then you shouldn't need to use the reshuffle but still could if
>>>> you found it helped.
>>>>
>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
>>>> wrote:
>>>>
>>>>> Re Lukasz: Thanks! I am not able to control the compression format but
>>>>> I will see whether the splitting gzip files will work. Is there a simple
>>>>> flag in Dataflow that could turn off the fusion?
>>>>>
>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>>> itself is not parallel either.
>>>>>
>>>>> Thanks all,
>>>>>
>>>>> Allie
>>>>>
>>>>> *From: *Reuven Lax <re...@google.com>
>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>>> *To: *dev
>>>>> *Cc: *user
>>>>>
>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>>>> simply reading and decompressing all that data was very slow when there was
>>>>>> no parallelism.
>>>>>>
>>>>>> *From: *Allie Chen <yi...@google.com>
>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>>> *To: * <de...@beam.apache.org>
>>>>>> *Cc: * <us...@beam.apache.org>
>>>>>>
>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>
>>>>>>> The file format for our users are mostly gzip format, since
>>>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>>>> GB).
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>
>>>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>>>
>>>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>>>
>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
>>>>>>>> till all the data has been read before the next transforms can run. After
>>>>>>>> the reshuffle, the data should have been processed in parallel across the
>>>>>>>> workers. Did you see this?
>>>>>>>>
>>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>
>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>>> the compressed file is not splittable, one worker is allocated to read the
>>>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>>>> to use Reshuffle Transform
>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>>>> all data arrived at this point.
>>>>>>>>>
>>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>>> other transforms after reading?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Allie
>>>>>>>>>
>>>>>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
Sorry I couldn't be more helpful.

*From: *Allie Chen <yi...@google.com>
*Date: *Tue, May 14, 2019 at 10:09 AM
*To: * <de...@beam.apache.org>
*Cc: *user

Thank Lukasz. Unfortunately, decompressing the files is not an option for
> us.
>
>
> I am trying to speed up Reshuffle step, since it waits for all data. Here
> are two ways I have tried:
>
> 1.  add timestamps to the PCollection's elements after reading (since it
> is bounded source), then apply windowing before Reshuffle, but it still
> waits all data.
>
>
> 2.  run the pipeline with --streaming flag, but it leads to an error:
> Workflow failed. Causes: Expected custom source to have non-zero number of
> splits. Also, I found in
> https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
> :
>
> *DataflowRunner does not currently support the following Cloud Dataflow
> specific features with Python streaming execution.*
>
>    -
>
>    *Streaming autoscaling*
>
> I doubt whether this approach can solve my issue.
>
>
> Thanks so much!
>
> Allie
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Tue, May 14, 2019 at 11:16 AM
> *To: *dev
> *Cc: *user
>
> Do you need to perform any joins across the files (e.g.
>> Combine.perKey/GroupByKey/...)?
>> If not, you could structure your pipeline
>> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
>> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
>> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
>> and then run it as a batch pipeline.
>>
>> You can set --streaming=true on the pipeline and then it will run in a
>> streaming mode but streaming prioritizes low latency and correctness on
>> Google Cloud Dataflow so it will cost more to run your pipeline then in
>> batch mode. It may make more sense to store the data uncompressed as it may
>> be less expensive then paying the additional compute cost for streaming.
>>
>> *From: *Allie Chen <yi...@google.com>
>> *Date: *Tue, May 14, 2019 at 7:38 AM
>> *To: * <de...@beam.apache.org>
>> *Cc: *user
>>
>> Is it possible to use windowing or somehow pretend it is streaming so
>>> Reshuffle or GroupByKey won't wait until all data has been read?
>>>
>>> Thanks!
>>> Allie
>>>
>>> *From: *Lukasz Cwik <lc...@google.com>
>>> *Date: *Fri, May 10, 2019 at 5:36 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> There is no such flag to turn of fusion.
>>>>
>>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>>> when it is limited to a small number of workers.
>>>>
>>>> If you can split up your input into a lot of smaller files that are
>>>> compressed then you shouldn't need to use the reshuffle but still could if
>>>> you found it helped.
>>>>
>>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
>>>> wrote:
>>>>
>>>>> Re Lukasz: Thanks! I am not able to control the compression format but
>>>>> I will see whether the splitting gzip files will work. Is there a simple
>>>>> flag in Dataflow that could turn off the fusion?
>>>>>
>>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>>> itself is not parallel either.
>>>>>
>>>>> Thanks all,
>>>>>
>>>>> Allie
>>>>>
>>>>> *From: *Reuven Lax <re...@google.com>
>>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>>> *To: *dev
>>>>> *Cc: *user
>>>>>
>>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>>>> simply reading and decompressing all that data was very slow when there was
>>>>>> no parallelism.
>>>>>>
>>>>>> *From: *Allie Chen <yi...@google.com>
>>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>>> *To: * <de...@beam.apache.org>
>>>>>> *Cc: * <us...@beam.apache.org>
>>>>>>
>>>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>>>
>>>>>>> The file format for our users are mostly gzip format, since
>>>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>>>> GB).
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>
>>>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>>>
>>>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>>>
>>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits
>>>>>>>> till all the data has been read before the next transforms can run. After
>>>>>>>> the reshuffle, the data should have been processed in parallel across the
>>>>>>>> workers. Did you see this?
>>>>>>>>
>>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>>> uncompressed file or many compressed files?
>>>>>>>>
>>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>>> the compressed file is not splittable, one worker is allocated to read the
>>>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>>>> to use Reshuffle Transform
>>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>>>> all data arrived at this point.
>>>>>>>>>
>>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>>> other transforms after reading?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Allie
>>>>>>>>>
>>>>>>>>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Thank Lukasz. Unfortunately, decompressing the files is not an option for
us.


I am trying to speed up Reshuffle step, since it waits for all data. Here
are two ways I have tried:

1.  add timestamps to the PCollection's elements after reading (since it is
bounded source), then apply windowing before Reshuffle, but it still waits
all data.


2.  run the pipeline with --streaming flag, but it leads to an error:
Workflow failed. Causes: Expected custom source to have non-zero number of
splits. Also, I found in
https://beam.apache.org/documentation/sdks/python-streaming/#dataflowrunner-specific-features
:

*DataflowRunner does not currently support the following Cloud Dataflow
specific features with Python streaming execution.*

   -

   *Streaming autoscaling*

I doubt whether this approach can solve my issue.


Thanks so much!

Allie

*From: *Lukasz Cwik <lc...@google.com>
*Date: *Tue, May 14, 2019 at 11:16 AM
*To: *dev
*Cc: *user

Do you need to perform any joins across the files (e.g.
> Combine.perKey/GroupByKey/...)?
> If not, you could structure your pipeline
> ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
> ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
> ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
> and then run it as a batch pipeline.
>
> You can set --streaming=true on the pipeline and then it will run in a
> streaming mode but streaming prioritizes low latency and correctness on
> Google Cloud Dataflow so it will cost more to run your pipeline then in
> batch mode. It may make more sense to store the data uncompressed as it may
> be less expensive then paying the additional compute cost for streaming.
>
> *From: *Allie Chen <yi...@google.com>
> *Date: *Tue, May 14, 2019 at 7:38 AM
> *To: * <de...@beam.apache.org>
> *Cc: *user
>
> Is it possible to use windowing or somehow pretend it is streaming so
>> Reshuffle or GroupByKey won't wait until all data has been read?
>>
>> Thanks!
>> Allie
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Fri, May 10, 2019 at 5:36 PM
>> *To: *dev
>> *Cc: *user
>>
>> There is no such flag to turn of fusion.
>>>
>>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>>> when it is limited to a small number of workers.
>>>
>>> If you can split up your input into a lot of smaller files that are
>>> compressed then you shouldn't need to use the reshuffle but still could if
>>> you found it helped.
>>>
>>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com>
>>> wrote:
>>>
>>>> Re Lukasz: Thanks! I am not able to control the compression format but
>>>> I will see whether the splitting gzip files will work. Is there a simple
>>>> flag in Dataflow that could turn off the fusion?
>>>>
>>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey
>>>> and FlatMap in Reshuffle are very slow when the data is large. Reshuffle
>>>> itself is not parallel either.
>>>>
>>>> Thanks all,
>>>>
>>>> Allie
>>>>
>>>> *From: *Reuven Lax <re...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>>> *To: *dev
>>>> *Cc: *user
>>>>
>>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>>> simply reading and decompressing all that data was very slow when there was
>>>>> no parallelism.
>>>>>
>>>>> *From: *Allie Chen <yi...@google.com>
>>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>>> *To: * <de...@beam.apache.org>
>>>>> *Cc: * <us...@beam.apache.org>
>>>>>
>>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>>
>>>>>> The file format for our users are mostly gzip format, since
>>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>>> GB).
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>
>>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>>
>>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>>
>>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>>>> all the data has been read before the next transforms can run. After the
>>>>>>> reshuffle, the data should have been processed in parallel across the
>>>>>>> workers. Did you see this?
>>>>>>>
>>>>>>> Are you able to change the input of your pipeline to use an
>>>>>>> uncompressed file or many compressed files?
>>>>>>>
>>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since
>>>>>>>> the compressed file is not splittable, one worker is allocated to read the
>>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>>> to use Reshuffle Transform
>>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>>> all data arrived at this point.
>>>>>>>>
>>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>>> other transforms after reading?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Allie
>>>>>>>>
>>>>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
Do you need to perform any joins across the files (e.g.
Combine.perKey/GroupByKey/...)?
If not, you could structure your pipeline
ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
and then run it as a batch pipeline.

You can set --streaming=true on the pipeline and then it will run in a
streaming mode but streaming prioritizes low latency and correctness on
Google Cloud Dataflow so it will cost more to run your pipeline then in
batch mode. It may make more sense to store the data uncompressed as it may
be less expensive then paying the additional compute cost for streaming.

*From: *Allie Chen <yi...@google.com>
*Date: *Tue, May 14, 2019 at 7:38 AM
*To: * <de...@beam.apache.org>
*Cc: *user

Is it possible to use windowing or somehow pretend it is streaming so
> Reshuffle or GroupByKey won't wait until all data has been read?
>
> Thanks!
> Allie
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 5:36 PM
> *To: *dev
> *Cc: *user
>
> There is no such flag to turn of fusion.
>>
>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>> when it is limited to a small number of workers.
>>
>> If you can split up your input into a lot of smaller files that are
>> compressed then you shouldn't need to use the reshuffle but still could if
>> you found it helped.
>>
>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>>
>>> Re Lukasz: Thanks! I am not able to control the compression format but I
>>> will see whether the splitting gzip files will work. Is there a simple flag
>>> in Dataflow that could turn off the fusion?
>>>
>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>>> is not parallel either.
>>>
>>> Thanks all,
>>>
>>> Allie
>>>
>>> *From: *Reuven Lax <re...@google.com>
>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>> simply reading and decompressing all that data was very slow when there was
>>>> no parallelism.
>>>>
>>>> *From: *Allie Chen <yi...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>> *To: * <de...@beam.apache.org>
>>>> *Cc: * <us...@beam.apache.org>
>>>>
>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>
>>>>> The file format for our users are mostly gzip format, since
>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>> GB).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Allie
>>>>>
>>>>>
>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>
>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>
>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>>> all the data has been read before the next transforms can run. After the
>>>>>> reshuffle, the data should have been processed in parallel across the
>>>>>> workers. Did you see this?
>>>>>>
>>>>>> Are you able to change the input of your pipeline to use an
>>>>>> uncompressed file or many compressed files?
>>>>>>
>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>> to use Reshuffle Transform
>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>> all data arrived at this point.
>>>>>>>
>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>> other transforms after reading?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
Do you need to perform any joins across the files (e.g.
Combine.perKey/GroupByKey/...)?
If not, you could structure your pipeline
ReadFromFileA --> Reshuffle(optional) --> CopyOfPipelineA
ReadFromFileB --> Reshuffle(optional) --> CopyOfPipelineB
ReadFromFileC --> Reshuffle(optional) --> CopyOfPipelineC
and then run it as a batch pipeline.

You can set --streaming=true on the pipeline and then it will run in a
streaming mode but streaming prioritizes low latency and correctness on
Google Cloud Dataflow so it will cost more to run your pipeline then in
batch mode. It may make more sense to store the data uncompressed as it may
be less expensive then paying the additional compute cost for streaming.

*From: *Allie Chen <yi...@google.com>
*Date: *Tue, May 14, 2019 at 7:38 AM
*To: * <de...@beam.apache.org>
*Cc: *user

Is it possible to use windowing or somehow pretend it is streaming so
> Reshuffle or GroupByKey won't wait until all data has been read?
>
> Thanks!
> Allie
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 5:36 PM
> *To: *dev
> *Cc: *user
>
> There is no such flag to turn of fusion.
>>
>> Writing 100s of GiBs of uncompressed data to reshuffle will take time
>> when it is limited to a small number of workers.
>>
>> If you can split up your input into a lot of smaller files that are
>> compressed then you shouldn't need to use the reshuffle but still could if
>> you found it helped.
>>
>> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>>
>>> Re Lukasz: Thanks! I am not able to control the compression format but I
>>> will see whether the splitting gzip files will work. Is there a simple flag
>>> in Dataflow that could turn off the fusion?
>>>
>>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>>> is not parallel either.
>>>
>>> Thanks all,
>>>
>>> Allie
>>>
>>> *From: *Reuven Lax <re...@google.com>
>>> *Date: *Fri, May 10, 2019 at 5:02 PM
>>> *To: *dev
>>> *Cc: *user
>>>
>>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>>> simply reading and decompressing all that data was very slow when there was
>>>> no parallelism.
>>>>
>>>> *From: *Allie Chen <yi...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>>> *To: * <de...@beam.apache.org>
>>>> *Cc: * <us...@beam.apache.org>
>>>>
>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>>
>>>>> The file format for our users are mostly gzip format, since
>>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>>> GB).
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Allie
>>>>>
>>>>>
>>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>>> *To: *dev, <us...@beam.apache.org>
>>>>>
>>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>>
>>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>>> all the data has been read before the next transforms can run. After the
>>>>>> reshuffle, the data should have been processed in parallel across the
>>>>>> workers. Did you see this?
>>>>>>
>>>>>> Are you able to change the input of your pipeline to use an
>>>>>> uncompressed file or many compressed files?
>>>>>>
>>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>>> to use Reshuffle Transform
>>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>>> all data arrived at this point.
>>>>>>>
>>>>>>> Is there any other ways to allow more workers working on all the
>>>>>>> other transforms after reading?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Allie
>>>>>>>
>>>>>>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Is it possible to use windowing or somehow pretend it is streaming so
Reshuffle or GroupByKey won't wait until all data has been read?

Thanks!
Allie

*From: *Lukasz Cwik <lc...@google.com>
*Date: *Fri, May 10, 2019 at 5:36 PM
*To: *dev
*Cc: *user

There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen <yi...@google.com>
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * <de...@beam.apache.org>
>>> *Cc: * <us...@beam.apache.org>
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>
>>>> The file format for our users are mostly gzip format, since
>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>> GB).
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>> *To: *dev, <us...@beam.apache.org>
>>>>
>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>
>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>> all the data has been read before the next transforms can run. After the
>>>>> reshuffle, the data should have been processed in parallel across the
>>>>> workers. Did you see this?
>>>>>
>>>>> Are you able to change the input of your pipeline to use an
>>>>> uncompressed file or many compressed files?
>>>>>
>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>> to use Reshuffle Transform
>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>> all data arrived at this point.
>>>>>>
>>>>>> Is there any other ways to allow more workers working on all the
>>>>>> other transforms after reading?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>

Re: Problem with gzip

Posted by Michael Luckey <ad...@gmail.com>.
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.

[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118

On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik <lc...@google.com> wrote:

> There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen <yi...@google.com>
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * <de...@beam.apache.org>
>>> *Cc: * <us...@beam.apache.org>
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>
>>>> The file format for our users are mostly gzip format, since
>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>> GB).
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>> *To: *dev, <us...@beam.apache.org>
>>>>
>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>
>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>> all the data has been read before the next transforms can run. After the
>>>>> reshuffle, the data should have been processed in parallel across the
>>>>> workers. Did you see this?
>>>>>
>>>>> Are you able to change the input of your pipeline to use an
>>>>> uncompressed file or many compressed files?
>>>>>
>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>> to use Reshuffle Transform
>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>> all data arrived at this point.
>>>>>>
>>>>>> Is there any other ways to allow more workers working on all the
>>>>>> other transforms after reading?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>

Re: Problem with gzip

Posted by Michael Luckey <ad...@gmail.com>.
Maybe the solution implemented on JdbcIO [1], [2] could be helpful in this
cases.

[1] https://issues.apache.org/jira/browse/BEAM-2803
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1088-L1118

On Fri, May 10, 2019 at 11:36 PM Lukasz Cwik <lc...@google.com> wrote:

> There is no such flag to turn of fusion.
>
> Writing 100s of GiBs of uncompressed data to reshuffle will take time when
> it is limited to a small number of workers.
>
> If you can split up your input into a lot of smaller files that are
> compressed then you shouldn't need to use the reshuffle but still could if
> you found it helped.
>
> On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:
>
>> Re Lukasz: Thanks! I am not able to control the compression format but I
>> will see whether the splitting gzip files will work. Is there a simple flag
>> in Dataflow that could turn off the fusion?
>>
>> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
>> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
>> is not parallel either.
>>
>> Thanks all,
>>
>> Allie
>>
>> *From: *Reuven Lax <re...@google.com>
>> *Date: *Fri, May 10, 2019 at 5:02 PM
>> *To: *dev
>> *Cc: *user
>>
>> It's unlikely that Reshuffle itself takes hours. It's more likely that
>>> simply reading and decompressing all that data was very slow when there was
>>> no parallelism.
>>>
>>> *From: *Allie Chen <yi...@google.com>
>>> *Date: *Fri, May 10, 2019 at 1:17 PM
>>> *To: * <de...@beam.apache.org>
>>> *Cc: * <us...@beam.apache.org>
>>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>
>>>> The file format for our users are mostly gzip format, since
>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>> GB).
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>> *To: *dev, <us...@beam.apache.org>
>>>>
>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>
>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>> all the data has been read before the next transforms can run. After the
>>>>> reshuffle, the data should have been processed in parallel across the
>>>>> workers. Did you see this?
>>>>>
>>>>> Are you able to change the input of your pipeline to use an
>>>>> uncompressed file or many compressed files?
>>>>>
>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>> to use Reshuffle Transform
>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>> all data arrived at this point.
>>>>>>
>>>>>> Is there any other ways to allow more workers working on all the
>>>>>> other transforms after reading?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
There is no such flag to turn of fusion.

Writing 100s of GiBs of uncompressed data to reshuffle will take time when
it is limited to a small number of workers.

If you can split up your input into a lot of smaller files that are
compressed then you shouldn't need to use the reshuffle but still could if
you found it helped.

On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:

> Re Lukasz: Thanks! I am not able to control the compression format but I
> will see whether the splitting gzip files will work. Is there a simple flag
> in Dataflow that could turn off the fusion?
>
> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
> is not parallel either.
>
> Thanks all,
>
> Allie
>
> *From: *Reuven Lax <re...@google.com>
> *Date: *Fri, May 10, 2019 at 5:02 PM
> *To: *dev
> *Cc: *user
>
> It's unlikely that Reshuffle itself takes hours. It's more likely that
>> simply reading and decompressing all that data was very slow when there was
>> no parallelism.
>>
>> *From: *Allie Chen <yi...@google.com>
>> *Date: *Fri, May 10, 2019 at 1:17 PM
>> *To: * <de...@beam.apache.org>
>> *Cc: * <us...@beam.apache.org>
>>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik <lc...@google.com>
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, <us...@beam.apache.org>
>>>
>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>
>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>> all the data has been read before the next transforms can run. After the
>>>> reshuffle, the data should have been processed in parallel across the
>>>> workers. Did you see this?
>>>>
>>>> Are you able to change the input of your pipeline to use an
>>>> uncompressed file or many compressed files?
>>>>
>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>> to use Reshuffle Transform
>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>> all data arrived at this point.
>>>>>
>>>>> Is there any other ways to allow more workers working on all the other
>>>>> transforms after reading?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Allie
>>>>>
>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
There is no such flag to turn of fusion.

Writing 100s of GiBs of uncompressed data to reshuffle will take time when
it is limited to a small number of workers.

If you can split up your input into a lot of smaller files that are
compressed then you shouldn't need to use the reshuffle but still could if
you found it helped.

On Fri, May 10, 2019 at 2:24 PM Allie Chen <yi...@google.com> wrote:

> Re Lukasz: Thanks! I am not able to control the compression format but I
> will see whether the splitting gzip files will work. Is there a simple flag
> in Dataflow that could turn off the fusion?
>
> Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
> FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
> is not parallel either.
>
> Thanks all,
>
> Allie
>
> *From: *Reuven Lax <re...@google.com>
> *Date: *Fri, May 10, 2019 at 5:02 PM
> *To: *dev
> *Cc: *user
>
> It's unlikely that Reshuffle itself takes hours. It's more likely that
>> simply reading and decompressing all that data was very slow when there was
>> no parallelism.
>>
>> *From: *Allie Chen <yi...@google.com>
>> *Date: *Fri, May 10, 2019 at 1:17 PM
>> *To: * <de...@beam.apache.org>
>> *Cc: * <us...@beam.apache.org>
>>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik <lc...@google.com>
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, <us...@beam.apache.org>
>>>
>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>
>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>> all the data has been read before the next transforms can run. After the
>>>> reshuffle, the data should have been processed in parallel across the
>>>> workers. Did you see this?
>>>>
>>>> Are you able to change the input of your pipeline to use an
>>>> uncompressed file or many compressed files?
>>>>
>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>> to use Reshuffle Transform
>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>> all data arrived at this point.
>>>>>
>>>>> Is there any other ways to allow more workers working on all the other
>>>>> transforms after reading?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Allie
>>>>>
>>>>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Re Lukasz: Thanks! I am not able to control the compression format but I
will see whether the splitting gzip files will work. Is there a simple flag
in Dataflow that could turn off the fusion?

Re Reuven: No, I checked the run time on Dataflow UI, the GroupByKey and
FlatMap in Reshuffle are very slow when the data is large. Reshuffle itself
is not parallel either.

Thanks all,

Allie

*From: *Reuven Lax <re...@google.com>
*Date: *Fri, May 10, 2019 at 5:02 PM
*To: *dev
*Cc: *user

It's unlikely that Reshuffle itself takes hours. It's more likely that
> simply reading and decompressing all that data was very slow when there was
> no parallelism.
>
> *From: *Allie Chen <yi...@google.com>
> *Date: *Fri, May 10, 2019 at 1:17 PM
> *To: * <de...@beam.apache.org>
> *Cc: * <us...@beam.apache.org>
>
> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since uncompressed
>> files would be too costly to store (It could be in hundreds of GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, <us...@beam.apache.org>
>>
>> +user@beam.apache.org <us...@beam.apache.org>
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>>> the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an uncompressed
>>> file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>> compressed file is not splittable, one worker is allocated to read the
>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>> all transforms together.  There are a large amount of data in the file, and
>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>> to use Reshuffle Transform
>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>> all data arrived at this point.
>>>>
>>>> Is there any other ways to allow more workers working on all the other
>>>> transforms after reading?
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>

Re: Problem with gzip

Posted by Reuven Lax <re...@google.com>.
It's unlikely that Reshuffle itself takes hours. It's more likely that
simply reading and decompressing all that data was very slow when there was
no parallelism.

*From: *Allie Chen <yi...@google.com>
*Date: *Fri, May 10, 2019 at 1:17 PM
*To: * <de...@beam.apache.org>
*Cc: * <us...@beam.apache.org>

Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, <us...@beam.apache.org>
>
> +user@beam.apache.org <us...@beam.apache.org>
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
The best solution would be to find a compression format that is splittable
and add support for that to Apache Beam and use it. The issue with
compressed files is that you can't read from an arbitrary offset. This
stack overflow post[1] has some suggestions on seekable compression
libraries.

A much easier solution would be to split up your data to 100s of gzip
files. This would give you most of the compression benefit and would also
give you a lot of parallelization benefit during reading.

1:
https://stackoverflow.com/questions/2046559/any-seekable-compression-library

On Fri, May 10, 2019 at 1:25 PM Allie Chen <yi...@google.com> wrote:

> Yes, that is correct.
>
> *From: *Allie Chen <yi...@google.com>
> *Date: *Fri, May 10, 2019 at 4:21 PM
> *To: * <de...@beam.apache.org>
> *Cc: * <us...@beam.apache.org>
>
> Yes.
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Fri, May 10, 2019 at 4:19 PM
>> *To: *dev
>> *Cc: * <us...@beam.apache.org>
>>
>> When you had X gzip files and were not using Reshuffle, did you see X
>>> workers read and process the files?
>>>
>>> On Fri, May 10, 2019 at 1:17 PM Allie Chen <yi...@google.com>
>>> wrote:
>>>
>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>
>>>> The file format for our users are mostly gzip format, since
>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>> GB).
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>> *To: *dev, <us...@beam.apache.org>
>>>>
>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>
>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>> all the data has been read before the next transforms can run. After the
>>>>> reshuffle, the data should have been processed in parallel across the
>>>>> workers. Did you see this?
>>>>>
>>>>> Are you able to change the input of your pipeline to use an
>>>>> uncompressed file or many compressed files?
>>>>>
>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>> to use Reshuffle Transform
>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>> all data arrived at this point.
>>>>>>
>>>>>> Is there any other ways to allow more workers working on all the
>>>>>> other transforms after reading?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
The best solution would be to find a compression format that is splittable
and add support for that to Apache Beam and use it. The issue with
compressed files is that you can't read from an arbitrary offset. This
stack overflow post[1] has some suggestions on seekable compression
libraries.

A much easier solution would be to split up your data to 100s of gzip
files. This would give you most of the compression benefit and would also
give you a lot of parallelization benefit during reading.

1:
https://stackoverflow.com/questions/2046559/any-seekable-compression-library

On Fri, May 10, 2019 at 1:25 PM Allie Chen <yi...@google.com> wrote:

> Yes, that is correct.
>
> *From: *Allie Chen <yi...@google.com>
> *Date: *Fri, May 10, 2019 at 4:21 PM
> *To: * <de...@beam.apache.org>
> *Cc: * <us...@beam.apache.org>
>
> Yes.
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Fri, May 10, 2019 at 4:19 PM
>> *To: *dev
>> *Cc: * <us...@beam.apache.org>
>>
>> When you had X gzip files and were not using Reshuffle, did you see X
>>> workers read and process the files?
>>>
>>> On Fri, May 10, 2019 at 1:17 PM Allie Chen <yi...@google.com>
>>> wrote:
>>>
>>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>>> Reshuffle transform itself takes hours or even days to run, according to
>>>> one test (24 gzip files, 17 million lines in total) I did.
>>>>
>>>> The file format for our users are mostly gzip format, since
>>>> uncompressed files would be too costly to store (It could be in hundreds of
>>>> GB).
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>
>>>> *From: *Lukasz Cwik <lc...@google.com>
>>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>>> *To: *dev, <us...@beam.apache.org>
>>>>
>>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>>
>>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>>> all the data has been read before the next transforms can run. After the
>>>>> reshuffle, the data should have been processed in parallel across the
>>>>> workers. Did you see this?
>>>>>
>>>>> Are you able to change the input of your pipeline to use an
>>>>> uncompressed file or many compressed files?
>>>>>
>>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>>> to use Reshuffle Transform
>>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>>> all data arrived at this point.
>>>>>>
>>>>>> Is there any other ways to allow more workers working on all the
>>>>>> other transforms after reading?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Allie
>>>>>>
>>>>>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Yes, that is correct.

*From: *Allie Chen <yi...@google.com>
*Date: *Fri, May 10, 2019 at 4:21 PM
*To: * <de...@beam.apache.org>
*Cc: * <us...@beam.apache.org>

Yes.
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 4:19 PM
> *To: *dev
> *Cc: * <us...@beam.apache.org>
>
> When you had X gzip files and were not using Reshuffle, did you see X
>> workers read and process the files?
>>
>> On Fri, May 10, 2019 at 1:17 PM Allie Chen <yi...@google.com> wrote:
>>
>>> Yes, I do see the data after reshuffle are processed in parallel. But
>>> Reshuffle transform itself takes hours or even days to run, according to
>>> one test (24 gzip files, 17 million lines in total) I did.
>>>
>>> The file format for our users are mostly gzip format, since uncompressed
>>> files would be too costly to store (It could be in hundreds of GB).
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>
>>> *From: *Lukasz Cwik <lc...@google.com>
>>> *Date: *Fri, May 10, 2019 at 4:07 PM
>>> *To: *dev, <us...@beam.apache.org>
>>>
>>> +user@beam.apache.org <us...@beam.apache.org>
>>>>
>>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till
>>>> all the data has been read before the next transforms can run. After the
>>>> reshuffle, the data should have been processed in parallel across the
>>>> workers. Did you see this?
>>>>
>>>> Are you able to change the input of your pipeline to use an
>>>> uncompressed file or many compressed files?
>>>>
>>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>>> compressed file is not splittable, one worker is allocated to read the
>>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>>> all transforms together.  There are a large amount of data in the file, and
>>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>>> to use Reshuffle Transform
>>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>>> all data arrived at this point.
>>>>>
>>>>> Is there any other ways to allow more workers working on all the other
>>>>> transforms after reading?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Allie
>>>>>
>>>>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Yes.

*From: *Lukasz Cwik <lc...@google.com>
*Date: *Fri, May 10, 2019 at 4:19 PM
*To: *dev
*Cc: * <us...@beam.apache.org>

When you had X gzip files and were not using Reshuffle, did you see X
> workers read and process the files?
>
> On Fri, May 10, 2019 at 1:17 PM Allie Chen <yi...@google.com> wrote:
>
>> Yes, I do see the data after reshuffle are processed in parallel. But
>> Reshuffle transform itself takes hours or even days to run, according to
>> one test (24 gzip files, 17 million lines in total) I did.
>>
>> The file format for our users are mostly gzip format, since uncompressed
>> files would be too costly to store (It could be in hundreds of GB).
>>
>> Thanks,
>>
>> Allie
>>
>>
>> *From: *Lukasz Cwik <lc...@google.com>
>> *Date: *Fri, May 10, 2019 at 4:07 PM
>> *To: *dev, <us...@beam.apache.org>
>>
>> +user@beam.apache.org <us...@beam.apache.org>
>>>
>>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>>> the data has been read before the next transforms can run. After the
>>> reshuffle, the data should have been processed in parallel across the
>>> workers. Did you see this?
>>>
>>> Are you able to change the input of your pipeline to use an uncompressed
>>> file or many compressed files?
>>>
>>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>
>>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>>> compressed file is not splittable, one worker is allocated to read the
>>>> file. The same worker will do all the other transforms since Dataflow fused
>>>> all transforms together.  There are a large amount of data in the file, and
>>>> I expect to see more workers spinning up after reading transforms. I tried
>>>> to use Reshuffle Transform
>>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>>> all data arrived at this point.
>>>>
>>>> Is there any other ways to allow more workers working on all the other
>>>> transforms after reading?
>>>>
>>>> Thanks,
>>>>
>>>> Allie
>>>>
>>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
When you had X gzip files and were not using Reshuffle, did you see X
workers read and process the files?

On Fri, May 10, 2019 at 1:17 PM Allie Chen <yi...@google.com> wrote:

> Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, <us...@beam.apache.org>
>
> +user@beam.apache.org <us...@beam.apache.org>
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
When you had X gzip files and were not using Reshuffle, did you see X
workers read and process the files?

On Fri, May 10, 2019 at 1:17 PM Allie Chen <yi...@google.com> wrote:

> Yes, I do see the data after reshuffle are processed in parallel. But
> Reshuffle transform itself takes hours or even days to run, according to
> one test (24 gzip files, 17 million lines in total) I did.
>
> The file format for our users are mostly gzip format, since uncompressed
> files would be too costly to store (It could be in hundreds of GB).
>
> Thanks,
>
> Allie
>
>
> *From: *Lukasz Cwik <lc...@google.com>
> *Date: *Fri, May 10, 2019 at 4:07 PM
> *To: *dev, <us...@beam.apache.org>
>
> +user@beam.apache.org <us...@beam.apache.org>
>>
>> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
>> the data has been read before the next transforms can run. After the
>> reshuffle, the data should have been processed in parallel across the
>> workers. Did you see this?
>>
>> Are you able to change the input of your pipeline to use an uncompressed
>> file or many compressed files?
>>
>> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>>
>>> Hi,
>>>
>>>
>>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>>> compressed file is not splittable, one worker is allocated to read the
>>> file. The same worker will do all the other transforms since Dataflow fused
>>> all transforms together.  There are a large amount of data in the file, and
>>> I expect to see more workers spinning up after reading transforms. I tried
>>> to use Reshuffle Transform
>>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>>> to prevent the fusion, but it is not scalable since it won’t proceed until
>>> all data arrived at this point.
>>>
>>> Is there any other ways to allow more workers working on all the other
>>> transforms after reading?
>>>
>>> Thanks,
>>>
>>> Allie
>>>
>>>

Re: Problem with gzip

Posted by Allie Chen <yi...@google.com>.
Yes, I do see the data after reshuffle are processed in parallel. But
Reshuffle transform itself takes hours or even days to run, according to
one test (24 gzip files, 17 million lines in total) I did.

The file format for our users are mostly gzip format, since uncompressed
files would be too costly to store (It could be in hundreds of GB).

Thanks,

Allie


*From: *Lukasz Cwik <lc...@google.com>
*Date: *Fri, May 10, 2019 at 4:07 PM
*To: *dev, <us...@beam.apache.org>

+user@beam.apache.org <us...@beam.apache.org>
>
> Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
> the data has been read before the next transforms can run. After the
> reshuffle, the data should have been processed in parallel across the
> workers. Did you see this?
>
> Are you able to change the input of your pipeline to use an uncompressed
> file or many compressed files?
>
> On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:
>
>> Hi,
>>
>>
>> I am trying to load a gzip file to BigQuey using Dataflow. Since the
>> compressed file is not splittable, one worker is allocated to read the
>> file. The same worker will do all the other transforms since Dataflow fused
>> all transforms together.  There are a large amount of data in the file, and
>> I expect to see more workers spinning up after reading transforms. I tried
>> to use Reshuffle Transform
>> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
>> to prevent the fusion, but it is not scalable since it won’t proceed until
>> all data arrived at this point.
>>
>> Is there any other ways to allow more workers working on all the other
>> transforms after reading?
>>
>> Thanks,
>>
>> Allie
>>
>>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
+user@beam.apache.org <us...@beam.apache.org>

Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
the data has been read before the next transforms can run. After the
reshuffle, the data should have been processed in parallel across the
workers. Did you see this?

Are you able to change the input of your pipeline to use an uncompressed
file or many compressed files?

On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow fused
> all transforms together.  There are a large amount of data in the file, and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>

Re: Problem with gzip

Posted by Lukasz Cwik <lc...@google.com>.
+user@beam.apache.org <us...@beam.apache.org>

Reshuffle on Google Cloud Dataflow for a bounded pipeline waits till all
the data has been read before the next transforms can run. After the
reshuffle, the data should have been processed in parallel across the
workers. Did you see this?

Are you able to change the input of your pipeline to use an uncompressed
file or many compressed files?

On Fri, May 10, 2019 at 1:03 PM Allie Chen <yi...@google.com> wrote:

> Hi,
>
>
> I am trying to load a gzip file to BigQuey using Dataflow. Since the
> compressed file is not splittable, one worker is allocated to read the
> file. The same worker will do all the other transforms since Dataflow fused
> all transforms together.  There are a large amount of data in the file, and
> I expect to see more workers spinning up after reading transforms. I tried
> to use Reshuffle Transform
> <https://github.com/apache/beam/blob/release-2.3.0/sdks/python/apache_beam/transforms/util.py#L516>
> to prevent the fusion, but it is not scalable since it won’t proceed until
> all data arrived at this point.
>
> Is there any other ways to allow more workers working on all the other
> transforms after reading?
>
> Thanks,
>
> Allie
>
>