You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Shannon Duncan <jo...@liveramp.com> on 2019/09/18 18:06:41 UTC

Prevent Shuffling on Writing Files

We have been using Beam for a bit now. However we just turned on the
dataflow shuffle service and were very surprised that the shuffled data
amounts were quadruple the amounts we expected.

Turns out that the file writing TextIO is doing shuffles within itself.

Is there a way to prevent shuffling in the writing phase?

Thanks,
Shannon Duncan

Re: Prevent Shuffling on Writing Files

Posted by Jeff Klukas <jk...@mozilla.com>.
What you propose with a writer per bundle is definitely possible, but I
expect the blocker is that in most cases the runner has control of bundle
sizes and there's nothing exposed to the user to control that. I've wanted
to do similar, but found average bundle sizes in my case on Dataflow to be
so small that it wasn't feasible to write out a separate file/object per
bundle.

On Wed, Sep 18, 2019 at 4:57 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev <de...@beam.apache.org>
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>

Re: Prevent Shuffling on Writing Files

Posted by Jeff Klukas <jk...@mozilla.com>.
What you propose with a writer per bundle is definitely possible, but I
expect the blocker is that in most cases the runner has control of bundle
sizes and there's nothing exposed to the user to control that. I've wanted
to do similar, but found average bundle sizes in my case on Dataflow to be
so small that it wasn't feasible to write out a separate file/object per
bundle.

On Wed, Sep 18, 2019 at 4:57 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev <de...@beam.apache.org>
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
As a follow up the pricing as the number of bytes written + read to the
shuffle is confirmed.

However we were able to figure out a way to lower shuffle costs and things
are right in the world again.

Thanks ya'll!
Shannon

On Wed, Sep 18, 2019 at 4:52 PM Reuven Lax <re...@google.com> wrote:

> I believe that the Total shuffle data process counter counts the number of
> bytes written to shuffle + the number of bytes read. So if you shuffle 1GB
> of data, you should expect to see 2GB on the counter.
>
> On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> Ok just ran the job on a small input and did not specify numShards. so
>> it's literally just:
>>
>> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>>
>> Output of map for join:
>> [image: image.png]
>>
>> Details of Shuffle:
>> [image: image.png]
>>
>> Reported Bytes Shuffled:
>> [image: image.png]
>>
>>
>> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>>> joseph.duncan@liveramp.com> wrote:
>>>
>>>> I will attempt to do without sharding (though I believe we did do a run
>>>> without shards and it incurred the extra shuffle costs).
>>>>
>>>
>>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>>> small amount of data (essentially a list of filenames).
>>>
>>>>
>>>> Pipeline is simple.
>>>>
>>>> The only shuffle that is explicitly defined is the shuffle after
>>>> merging files together into a single PCollection (Flatten Transform).
>>>>
>>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>>
>>>> [image: image.png]
>>>>
>>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> In that case you should be able to leave sharding unspecified, and you
>>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>>> necessary only for streaming.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>
>>>>>> batch on dataflowRunner.
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>>
>>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>>
>>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>>> write to a single file.
>>>>>>>>
>>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>>
>>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>>> and processElement reuses that writter to write to the same file for all
>>>>>>>> elements within a bundle?
>>>>>>>>
>>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>>> I'm expanding it to dev as well.
>>>>>>>> +dev <de...@beam.apache.org>
>>>>>>>>
>>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>>
>>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>>>
>>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>>> the dataflow shuffle service and were very surprised that the shuffled data
>>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>>
>>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>>> itself.
>>>>>>>>>
>>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Shannon Duncan
>>>>>>>>>
>>>>>>>>

Re: Prevent Shuffling on Writing Files

Posted by Reuven Lax <re...@google.com>.
I believe that the Total shuffle data process counter counts the number of
bytes written to shuffle + the number of bytes read. So if you shuffle 1GB
of data, you should expect to see 2GB on the counter.

On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> Ok just ran the job on a small input and did not specify numShards. so
> it's literally just:
>
> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>
> Output of map for join:
> [image: image.png]
>
> Details of Shuffle:
> [image: image.png]
>
> Reported Bytes Shuffled:
> [image: image.png]
>
>
> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>> joseph.duncan@liveramp.com> wrote:
>>
>>> I will attempt to do without sharding (though I believe we did do a run
>>> without shards and it incurred the extra shuffle costs).
>>>
>>
>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>> small amount of data (essentially a list of filenames).
>>
>>>
>>> Pipeline is simple.
>>>
>>> The only shuffle that is explicitly defined is the shuffle after merging
>>> files together into a single PCollection (Flatten Transform).
>>>
>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>
>>> [image: image.png]
>>>
>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> In that case you should be able to leave sharding unspecified, and you
>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>> necessary only for streaming.
>>>>
>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>> joseph.duncan@liveramp.com> wrote:
>>>>
>>>>> batch on dataflowRunner.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>
>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>> write to a single file.
>>>>>>>
>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>
>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>> and processElement reuses that writter to write to the same file for all
>>>>>>> elements within a bundle?
>>>>>>>
>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>> I'm expanding it to dev as well.
>>>>>>> +dev <de...@beam.apache.org>
>>>>>>>
>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>>
>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>> the dataflow shuffle service and were very surprised that the shuffled data
>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>
>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>> itself.
>>>>>>>>
>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>>
>>>>>>>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
Sorry missed a part of the map output for flatten:

[image: image.png]

However the shuffle does show only 29.32 GB going into it but the output of
Total Shuffled data is 58.66 GB

[image: image.png]

On Wed, Sep 18, 2019 at 4:39 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> Ok just ran the job on a small input and did not specify numShards. so
> it's literally just:
>
> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>
> Output of map for join:
> [image: image.png]
>
> Details of Shuffle:
> [image: image.png]
>
> Reported Bytes Shuffled:
> [image: image.png]
>
>
> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote:
>
>>
>>
>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>> joseph.duncan@liveramp.com> wrote:
>>
>>> I will attempt to do without sharding (though I believe we did do a run
>>> without shards and it incurred the extra shuffle costs).
>>>
>>
>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>> small amount of data (essentially a list of filenames).
>>
>>>
>>> Pipeline is simple.
>>>
>>> The only shuffle that is explicitly defined is the shuffle after merging
>>> files together into a single PCollection (Flatten Transform).
>>>
>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>
>>> [image: image.png]
>>>
>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> In that case you should be able to leave sharding unspecified, and you
>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>> necessary only for streaming.
>>>>
>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>> joseph.duncan@liveramp.com> wrote:
>>>>
>>>>> batch on dataflowRunner.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>
>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>> write to a single file.
>>>>>>>
>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>
>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>> and processElement reuses that writter to write to the same file for all
>>>>>>> elements within a bundle?
>>>>>>>
>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>> I'm expanding it to dev as well.
>>>>>>> +dev <de...@beam.apache.org>
>>>>>>>
>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>>
>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>> the dataflow shuffle service and were very surprised that the shuffled data
>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>
>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>> itself.
>>>>>>>>
>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>>
>>>>>>>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
Ok just ran the job on a small input and did not specify numShards. so it's
literally just:

.apply("WriteLines", TextIO.write().to(options.getOutput()));

Output of map for join:
[image: image.png]

Details of Shuffle:
[image: image.png]

Reported Bytes Shuffled:
[image: image.png]


On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax <re...@google.com> wrote:

>
>
> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> I will attempt to do without sharding (though I believe we did do a run
>> without shards and it incurred the extra shuffle costs).
>>
>
> It shouldn't. There will be a shuffle, but that shuffle should contain a
> small amount of data (essentially a list of filenames).
>
>>
>> Pipeline is simple.
>>
>> The only shuffle that is explicitly defined is the shuffle after merging
>> files together into a single PCollection (Flatten Transform).
>>
>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
>> pay for shuffles on the middle shuffle but were surprised to see that the
>> output data from the Flatten was quadrupled in the reflected shuffled GB
>> shown in Dataflow. Which lead me down this path of finding things.
>>
>> [image: image.png]
>>
>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>>
>>> In that case you should be able to leave sharding unspecified, and you
>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>> necessary only for streaming.
>>>
>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>> joseph.duncan@liveramp.com> wrote:
>>>
>>>> batch on dataflowRunner.
>>>>
>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>
>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>
>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>> write to a single file.
>>>>>>
>>>>>> However... I wonder if there is way to just take the records that are
>>>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>>>> this. Closer to how hadoop handle's writes.
>>>>>>
>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>>>> processElement reuses that writter to write to the same file for all
>>>>>> elements within a bundle?
>>>>>>
>>>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>>>> expanding it to dev as well.
>>>>>> +dev <de...@beam.apache.org>
>>>>>>
>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon Duncan
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>>
>>>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>
>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>> itself.
>>>>>>>
>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>

Re: Prevent Shuffling on Writing Files

Posted by Reuven Lax <re...@google.com>.
On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> I will attempt to do without sharding (though I believe we did do a run
> without shards and it incurred the extra shuffle costs).
>

It shouldn't. There will be a shuffle, but that shuffle should contain a
small amount of data (essentially a list of filenames).

>
> Pipeline is simple.
>
> The only shuffle that is explicitly defined is the shuffle after merging
> files together into a single PCollection (Flatten Transform).
>
> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
> pay for shuffles on the middle shuffle but were surprised to see that the
> output data from the Flatten was quadrupled in the reflected shuffled GB
> shown in Dataflow. Which lead me down this path of finding things.
>
> [image: image.png]
>
> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:
>
>> In that case you should be able to leave sharding unspecified, and you
>> won't incur the extra shuffle. Specifying explicit sharding is generally
>> necessary only for streaming.
>>
>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>> joseph.duncan@liveramp.com> wrote:
>>
>>> batch on dataflowRunner.
>>>
>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Are you using streaming or batch? Also which runner are you using?
>>>>
>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>> joseph.duncan@liveramp.com> wrote:
>>>>
>>>>> So I followed up on why TextIO shuffles and dug into the code some. It
>>>>> is using the shards and getting all the values into a keyed group to write
>>>>> to a single file.
>>>>>
>>>>> However... I wonder if there is way to just take the records that are
>>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>>> this. Closer to how hadoop handle's writes.
>>>>>
>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>>> processElement reuses that writter to write to the same file for all
>>>>> elements within a bundle?
>>>>>
>>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>>> expanding it to dev as well.
>>>>> +dev <de...@beam.apache.org>
>>>>>
>>>>> Finding a solution that prevents quadrupling shuffle costs when simply
>>>>> writing out a file is a necessity for large scale jobs that work with 100+
>>>>> TB of data. If anyone has any ideas I'd love to hear them.
>>>>>
>>>>> Thanks,
>>>>> Shannon Duncan
>>>>>
>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>> joseph.duncan@liveramp.com> wrote:
>>>>>
>>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>>> amounts were quadruple the amounts we expected.
>>>>>>
>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>> itself.
>>>>>>
>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon Duncan
>>>>>>
>>>>>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
I will attempt to do without sharding (though I believe we did do a run
without shards and it incurred the extra shuffle costs).

Pipeline is simple.

The only shuffle that is explicitly defined is the shuffle after merging
files together into a single PCollection (Flatten Transform).

So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
pay for shuffles on the middle shuffle but were surprised to see that the
output data from the Flatten was quadrupled in the reflected shuffled GB
shown in Dataflow. Which lead me down this path of finding things.

[image: image.png]

On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:

> In that case you should be able to leave sharding unspecified, and you
> won't incur the extra shuffle. Specifying explicit sharding is generally
> necessary only for streaming.
>
> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> batch on dataflowRunner.
>>
>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Are you using streaming or batch? Also which runner are you using?
>>>
>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>> joseph.duncan@liveramp.com> wrote:
>>>
>>>> So I followed up on why TextIO shuffles and dug into the code some. It
>>>> is using the shards and getting all the values into a keyed group to write
>>>> to a single file.
>>>>
>>>> However... I wonder if there is way to just take the records that are
>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>> this. Closer to how hadoop handle's writes.
>>>>
>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>> processElement reuses that writter to write to the same file for all
>>>> elements within a bundle?
>>>>
>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>> expanding it to dev as well.
>>>> +dev <de...@beam.apache.org>
>>>>
>>>> Finding a solution that prevents quadrupling shuffle costs when simply
>>>> writing out a file is a necessity for large scale jobs that work with 100+
>>>> TB of data. If anyone has any ideas I'd love to hear them.
>>>>
>>>> Thanks,
>>>> Shannon Duncan
>>>>
>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>> joseph.duncan@liveramp.com> wrote:
>>>>
>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>> amounts were quadruple the amounts we expected.
>>>>>
>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>> itself.
>>>>>
>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>
>>>>> Thanks,
>>>>> Shannon Duncan
>>>>>
>>>>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
I will attempt to do without sharding (though I believe we did do a run
without shards and it incurred the extra shuffle costs).

Pipeline is simple.

The only shuffle that is explicitly defined is the shuffle after merging
files together into a single PCollection (Flatten Transform).

So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
pay for shuffles on the middle shuffle but were surprised to see that the
output data from the Flatten was quadrupled in the reflected shuffled GB
shown in Dataflow. Which lead me down this path of finding things.

[image: image.png]

On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax <re...@google.com> wrote:

> In that case you should be able to leave sharding unspecified, and you
> won't incur the extra shuffle. Specifying explicit sharding is generally
> necessary only for streaming.
>
> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> batch on dataflowRunner.
>>
>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Are you using streaming or batch? Also which runner are you using?
>>>
>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>> joseph.duncan@liveramp.com> wrote:
>>>
>>>> So I followed up on why TextIO shuffles and dug into the code some. It
>>>> is using the shards and getting all the values into a keyed group to write
>>>> to a single file.
>>>>
>>>> However... I wonder if there is way to just take the records that are
>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>> this. Closer to how hadoop handle's writes.
>>>>
>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>> processElement reuses that writter to write to the same file for all
>>>> elements within a bundle?
>>>>
>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>> expanding it to dev as well.
>>>> +dev <de...@beam.apache.org>
>>>>
>>>> Finding a solution that prevents quadrupling shuffle costs when simply
>>>> writing out a file is a necessity for large scale jobs that work with 100+
>>>> TB of data. If anyone has any ideas I'd love to hear them.
>>>>
>>>> Thanks,
>>>> Shannon Duncan
>>>>
>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>> joseph.duncan@liveramp.com> wrote:
>>>>
>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>> amounts were quadruple the amounts we expected.
>>>>>
>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>> itself.
>>>>>
>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>
>>>>> Thanks,
>>>>> Shannon Duncan
>>>>>
>>>>

Re: Prevent Shuffling on Writing Files

Posted by Reuven Lax <re...@google.com>.
In that case you should be able to leave sharding unspecified, and you
won't incur the extra shuffle. Specifying explicit sharding is generally
necessary only for streaming.

On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> batch on dataflowRunner.
>
> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:
>
>> Are you using streaming or batch? Also which runner are you using?
>>
>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>> joseph.duncan@liveramp.com> wrote:
>>
>>> So I followed up on why TextIO shuffles and dug into the code some. It
>>> is using the shards and getting all the values into a keyed group to write
>>> to a single file.
>>>
>>> However... I wonder if there is way to just take the records that are on
>>> a worker and write them out. Thus not needing a shard number and doing
>>> this. Closer to how hadoop handle's writes.
>>>
>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>> processElement reuses that writter to write to the same file for all
>>> elements within a bundle?
>>>
>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>> expanding it to dev as well.
>>> +dev <de...@beam.apache.org>
>>>
>>> Finding a solution that prevents quadrupling shuffle costs when simply
>>> writing out a file is a necessity for large scale jobs that work with 100+
>>> TB of data. If anyone has any ideas I'd love to hear them.
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>> joseph.duncan@liveramp.com> wrote:
>>>
>>>> We have been using Beam for a bit now. However we just turned on the
>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>> amounts were quadruple the amounts we expected.
>>>>
>>>> Turns out that the file writing TextIO is doing shuffles within itself.
>>>>
>>>> Is there a way to prevent shuffling in the writing phase?
>>>>
>>>> Thanks,
>>>> Shannon Duncan
>>>>
>>>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
batch on dataflowRunner.

On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:

> Are you using streaming or batch? Also which runner are you using?
>
> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> So I followed up on why TextIO shuffles and dug into the code some. It is
>> using the shards and getting all the values into a keyed group to write to
>> a single file.
>>
>> However... I wonder if there is way to just take the records that are on
>> a worker and write them out. Thus not needing a shard number and doing
>> this. Closer to how hadoop handle's writes.
>>
>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>> processElement reuses that writter to write to the same file for all
>> elements within a bundle?
>>
>> I feel like this goes beyond scope of simple user mailing list so I'm
>> expanding it to dev as well.
>> +dev <de...@beam.apache.org>
>>
>> Finding a solution that prevents quadrupling shuffle costs when simply
>> writing out a file is a necessity for large scale jobs that work with 100+
>> TB of data. If anyone has any ideas I'd love to hear them.
>>
>> Thanks,
>> Shannon Duncan
>>
>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>> joseph.duncan@liveramp.com> wrote:
>>
>>> We have been using Beam for a bit now. However we just turned on the
>>> dataflow shuffle service and were very surprised that the shuffled data
>>> amounts were quadruple the amounts we expected.
>>>
>>> Turns out that the file writing TextIO is doing shuffles within itself.
>>>
>>> Is there a way to prevent shuffling in the writing phase?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
batch on dataflowRunner.

On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:

> Are you using streaming or batch? Also which runner are you using?
>
> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> So I followed up on why TextIO shuffles and dug into the code some. It is
>> using the shards and getting all the values into a keyed group to write to
>> a single file.
>>
>> However... I wonder if there is way to just take the records that are on
>> a worker and write them out. Thus not needing a shard number and doing
>> this. Closer to how hadoop handle's writes.
>>
>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>> processElement reuses that writter to write to the same file for all
>> elements within a bundle?
>>
>> I feel like this goes beyond scope of simple user mailing list so I'm
>> expanding it to dev as well.
>> +dev <de...@beam.apache.org>
>>
>> Finding a solution that prevents quadrupling shuffle costs when simply
>> writing out a file is a necessity for large scale jobs that work with 100+
>> TB of data. If anyone has any ideas I'd love to hear them.
>>
>> Thanks,
>> Shannon Duncan
>>
>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>> joseph.duncan@liveramp.com> wrote:
>>
>>> We have been using Beam for a bit now. However we just turned on the
>>> dataflow shuffle service and were very surprised that the shuffled data
>>> amounts were quadruple the amounts we expected.
>>>
>>> Turns out that the file writing TextIO is doing shuffles within itself.
>>>
>>> Is there a way to prevent shuffling in the writing phase?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>

Re: Prevent Shuffling on Writing Files

Posted by Reuven Lax <re...@google.com>.
Are you using streaming or batch? Also which runner are you using?

On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev <de...@beam.apache.org>
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>

Re: Prevent Shuffling on Writing Files

Posted by Chamikara Jayalath <ch...@google.com>.
Are you specifying the number of shards to write to:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L859

If so, this will incur an additional shuffle to re-distribute data written
by all workers into the given number of shards before writing.

In addition to that, I think we also run Reshuffle transforms on the set of
files to break fusion when finalizing files but that cost should not be
that significant.

Probably posting a sketch of your pipeline will be helpful.

Thanks,
Cham

On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev <de...@beam.apache.org>
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>

Re: Prevent Shuffling on Writing Files

Posted by Chamikara Jayalath <ch...@google.com>.
Are you specifying the number of shards to write to:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L859

If so, this will incur an additional shuffle to re-distribute data written
by all workers into the given number of shards before writing.

In addition to that, I think we also run Reshuffle transforms on the set of
files to break fusion when finalizing files but that cost should not be
that significant.

Probably posting a sketch of your pipeline will be helpful.

Thanks,
Cham

On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev <de...@beam.apache.org>
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
So I followed up on why TextIO shuffles and dug into the code some. It is
using the shards and getting all the values into a keyed group to write to
a single file.

However... I wonder if there is way to just take the records that are on a
worker and write them out. Thus not needing a shard number and doing this.
Closer to how hadoop handle's writes.

Maybe just a regular pardo and on bundleSetup it creates a writer and
processElement reuses that writter to write to the same file for all
elements within a bundle?

I feel like this goes beyond scope of simple user mailing list so I'm
expanding it to dev as well.
+dev <de...@beam.apache.org>

Finding a solution that prevents quadrupling shuffle costs when simply
writing out a file is a necessity for large scale jobs that work with 100+
TB of data. If anyone has any ideas I'd love to hear them.

Thanks,
Shannon Duncan

On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> We have been using Beam for a bit now. However we just turned on the
> dataflow shuffle service and were very surprised that the shuffled data
> amounts were quadruple the amounts we expected.
>
> Turns out that the file writing TextIO is doing shuffles within itself.
>
> Is there a way to prevent shuffling in the writing phase?
>
> Thanks,
> Shannon Duncan
>

Re: Prevent Shuffling on Writing Files

Posted by Shannon Duncan <jo...@liveramp.com>.
So I followed up on why TextIO shuffles and dug into the code some. It is
using the shards and getting all the values into a keyed group to write to
a single file.

However... I wonder if there is way to just take the records that are on a
worker and write them out. Thus not needing a shard number and doing this.
Closer to how hadoop handle's writes.

Maybe just a regular pardo and on bundleSetup it creates a writer and
processElement reuses that writter to write to the same file for all
elements within a bundle?

I feel like this goes beyond scope of simple user mailing list so I'm
expanding it to dev as well.
+dev <de...@beam.apache.org>

Finding a solution that prevents quadrupling shuffle costs when simply
writing out a file is a necessity for large scale jobs that work with 100+
TB of data. If anyone has any ideas I'd love to hear them.

Thanks,
Shannon Duncan

On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <jo...@liveramp.com>
wrote:

> We have been using Beam for a bit now. However we just turned on the
> dataflow shuffle service and were very surprised that the shuffled data
> amounts were quadruple the amounts we expected.
>
> Turns out that the file writing TextIO is doing shuffles within itself.
>
> Is there a way to prevent shuffling in the writing phase?
>
> Thanks,
> Shannon Duncan
>