You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andrii Biletskyi <an...@yahoo.com.INVALID> on 2017/05/23 19:14:05 UTC

Impact of coalesce operation before writing dataframe

Hi all,

I'm trying to understand the impact of coalesce operation on spark job
performance.

As a side note: were are using emrfs (i.e. aws s3) as source and a target
for the job.

Omitting unnecessary details job can be explained as: join 200M records
Dataframe stored in orc format on emrfs with another 200M records cached
Dataframe, the result of the join put back to emrfs. First DF is a set of
wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
shows 20 GB).

I have enough resources in my cluster to perform the job but I don't like
the fact that output datasource contains 200 part orc files (as
spark.sql.shuffle.partitions
defaults to 200) so before saving orc to emrfs I'm doing .coalesce(10).
From documentation coalesce looks like a quite harmless operations: no
repartitioning etc.

But with such setup my job fails to write dataset on the last stage. Right
now the error is OOM: GC overhead. When I change  .coalesce(10) to
.coalesce(100) the job runs much faster and finishes without errors.

So what's the impact of .coalesce in this case? And how to do in place
concatenation of files (not involving hive) to end up with smaller amount
of bigger files, as with .coalesce(100) job generates 100 orc snappy
encoded files ~300MB each.

Thanks,
Andrii

Re: Impact of coalesce operation before writing dataframe

Posted by Andrii Biletskyi <an...@yahoo.com.INVALID>.
Ah that's right. I didn't mention it: I have 10 executors in my cluster,
and so when I do .coalesce(10) and right after that saving orc to s3 - does
coalescing really affects parallelism? To me it looks like no, because we
went from 100 tasks that are executed in parallel by 10 executors to 10
tasks that are executed by same 10 executors. Now, I understand that there
may be some data skew that may result in uneven partitions but that's not
my case really (according to Spark UI).
Again I'm trying to understand first of all how coalescing dataset impacts
executor memory, gc etc. Maybe if coalesce is done before writing dataset,
each of the resulting partition needs to be evaluated and thus stored in
memory? - just a guess.

Andrii

2017-05-23 23:42 GMT+03:00 John Compitello <jo...@broadinstitute.org>:

> Spark is doing operations on each partition in parallel. If you decrease
> number of partitions, you’re potentially doing less work in parallel
> depending on your cluster setup.
>
> On May 23, 2017, at 4:23 PM, Andrii Biletskyi <andrii.biletskyi@yahoo.com.
> INVALID <an...@yahoo.com.invalid>> wrote:
>
>
> No, I didn't try to use repartition, how exactly it impacts the
> parallelism?
> In my understanding coalesce simply "unions" multiple partitions located
> on same executor "one on on top of the other", while repartition does
> hash-based shuffle decreasing the number of output partitions. So how this
> exactly affects the parallelism, which stage of the job?
>
> Thanks,
> Andrii
>
>
>
> On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust <
> michael@databricks.com> wrote:
>
>
> coalesce is nice because it does not shuffle, but the consequence of
> avoiding a shuffle is it will also reduce parallelism of the preceding
> computation.  Have you tried using repartition instead?
>
> On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <
> andrii.biletskyi@yahoo.com.invalid> wrote:
>
> Hi all,
>
> I'm trying to understand the impact of coalesce operation on spark job
> performance.
>
> As a side note: were are using emrfs (i.e. aws s3) as source and a target
> for the job.
>
> Omitting unnecessary details job can be explained as: join 200M records
> Dataframe stored in orc format on emrfs with another 200M records cached
> Dataframe, the result of the join put back to emrfs. First DF is a set of
> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
> shows 20 GB).
>
> I have enough resources in my cluster to perform the job but I don't like
> the fact that output datasource contains 200 part orc files (as spark.sql.shuffle.
> partitions defaults to 200) so before saving orc to emrfs I'm doing
> .coalesce(10). From documentation coalesce looks like a quite harmless
> operations: no repartitioning etc.
>
> But with such setup my job fails to write dataset on the last stage. Right
> now the error is OOM: GC overhead. When I change  .coalesce(10) to
> .coalesce(100) the job runs much faster and finishes without errors.
>
> So what's the impact of .coalesce in this case? And how to do in place
> concatenation of files (not involving hive) to end up with smaller amount
> of bigger files, as with .coalesce(100) job generates 100 orc snappy
> encoded files ~300MB each.
>
> Thanks,
> Andrii
>
>
>
>
>
>

Re: Impact of coalesce operation before writing dataframe

Posted by John Compitello <jo...@broadinstitute.org>.
Spark is doing operations on each partition in parallel. If you decrease number of partitions, you’re potentially doing less work in parallel depending on your cluster setup. 

> On May 23, 2017, at 4:23 PM, Andrii Biletskyi <an...@yahoo.com.INVALID> wrote:
> 
>  
> No, I didn't try to use repartition, how exactly it impacts the parallelism?
> In my understanding coalesce simply "unions" multiple partitions located on same executor "one on on top of the other", while repartition does hash-based shuffle decreasing the number of output partitions. So how this exactly affects the parallelism, which stage of the job?
> 
> Thanks,
> Andrii
> 
> 
> 
> On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust <mi...@databricks.com> wrote:
> 
> 
> coalesce is nice because it does not shuffle, but the consequence of avoiding a shuffle is it will also reduce parallelism of the preceding computation.  Have you tried using repartition instead?
> 
> On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <andrii.biletskyi@yahoo.com.invalid <ma...@yahoo.com.invalid>> wrote:
> Hi all,
> 
> I'm trying to understand the impact of coalesce operation on spark job performance.
> 
> As a side note: were are using emrfs (i.e. aws s3) as source and a target for the job.
> 
> Omitting unnecessary details job can be explained as: join 200M records Dataframe stored in orc format on emrfs with another 200M records cached Dataframe, the result of the join put back to emrfs. First DF is a set of wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark shows 20 GB).
> 
> I have enough resources in my cluster to perform the job but I don't like the fact that output datasource contains 200 part orc files (as spark.sql.shuffle. partitions defaults to 200) so before saving orc to emrfs I'm doing .coalesce(10). From documentation coalesce looks like a quite harmless operations: no repartitioning etc.
> 
> But with such setup my job fails to write dataset on the last stage. Right now the error is OOM: GC overhead. When I change  .coalesce(10) to .coalesce(100) the job runs much faster and finishes without errors.
> 
> So what's the impact of .coalesce in this case? And how to do in place concatenation of files (not involving hive) to end up with smaller amount of bigger files, as with .coalesce(100) job generates 100 orc snappy encoded files ~300MB each.
> 
> Thanks,
> Andrii
> 
> 
> 


Re: Impact of coalesce operation before writing dataframe

Posted by Andrii Biletskyi <an...@yahoo.com.INVALID>.
 No, I didn't try to use repartition, how exactly it impacts the parallelism?In my understanding coalesce simply "unions" multiple partitions located on same executor "one on on top of the other", while repartition does hash-based shuffle decreasing the number of output partitions. So how this exactly affects the parallelism, which stage of the job?
Thanks,Andrii
 

    On Tuesday, May 23, 2017 10:20 PM, Michael Armbrust <mi...@databricks.com> wrote:
 

 coalesce is nice because it does not shuffle, but the consequence of avoiding a shuffle is it will also reduce parallelism of the preceding computation.  Have you tried using repartition instead?
On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <an...@yahoo.com.invalid> wrote:

Hi all,
I'm trying to understand the impact of coalesce operation on spark job performance.
As a side note: were are using emrfs (i.e. aws s3) as source and a target for the job.
Omitting unnecessary details job can be explained as: join 200M records Dataframe stored in orc format on emrfs with another 200M records cached Dataframe, the result of the join put back to emrfs. First DF is a set of wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark shows 20 GB).
I have enough resources in my cluster to perform the job but I don't like the fact that output datasource contains 200 part orc files (as spark.sql.shuffle. partitions defaults to 200) so before saving orc to emrfs I'm doing .coalesce(10). From documentation coalesce looks like a quite harmless operations: no repartitioning etc.
But with such setup my job fails to write dataset on the last stage. Right now the error is OOM: GC overhead. When I change  .coalesce(10) to .coalesce(100) the job runs much faster and finishes without errors.
So what's the impact of .coalesce in this case? And how to do in place concatenation of files (not involving hive) to end up with smaller amount of bigger files, as with .coalesce(100) job generates 100 orc snappy encoded files ~300MB each.
Thanks,Andrii



   

Re: Impact of coalesce operation before writing dataframe

Posted by Andrii Biletskyi <an...@yahoo.com.INVALID>.
No, I didn't try to use repartition, how exactly it impacts the parallelism?
In my understanding coalesce simply "unions" multiple partitions located on
same executor "one on on top of the other", while repartition does
hash-based shuffle decreasing the number of output partitions. So how this
exactly affects the parallelism, which stage of the job?

Thanks,
Andrii

2017-05-23 22:19 GMT+03:00 Michael Armbrust <mi...@databricks.com>:

> coalesce is nice because it does not shuffle, but the consequence of
> avoiding a shuffle is it will also reduce parallelism of the preceding
> computation.  Have you tried using repartition instead?
>
> On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <
> andrii.biletskyi@yahoo.com.invalid> wrote:
>
>> Hi all,
>>
>> I'm trying to understand the impact of coalesce operation on spark job
>> performance.
>>
>> As a side note: were are using emrfs (i.e. aws s3) as source and a target
>> for the job.
>>
>> Omitting unnecessary details job can be explained as: join 200M records
>> Dataframe stored in orc format on emrfs with another 200M records cached
>> Dataframe, the result of the join put back to emrfs. First DF is a set of
>> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
>> shows 20 GB).
>>
>> I have enough resources in my cluster to perform the job but I don't like
>> the fact that output datasource contains 200 part orc files (as
>> spark.sql.shuffle.partitions defaults to 200) so before saving orc to
>> emrfs I'm doing .coalesce(10). From documentation coalesce looks like a
>> quite harmless operations: no repartitioning etc.
>>
>> But with such setup my job fails to write dataset on the last stage.
>> Right now the error is OOM: GC overhead. When I change  .coalesce(10) to
>> .coalesce(100) the job runs much faster and finishes without errors.
>>
>> So what's the impact of .coalesce in this case? And how to do in place
>> concatenation of files (not involving hive) to end up with smaller amount
>> of bigger files, as with .coalesce(100) job generates 100 orc snappy
>> encoded files ~300MB each.
>>
>> Thanks,
>> Andrii
>>
>
>

Re: Impact of coalesce operation before writing dataframe

Posted by Michael Armbrust <mi...@databricks.com>.
coalesce is nice because it does not shuffle, but the consequence of
avoiding a shuffle is it will also reduce parallelism of the preceding
computation.  Have you tried using repartition instead?

On Tue, May 23, 2017 at 12:14 PM, Andrii Biletskyi <
andrii.biletskyi@yahoo.com.invalid> wrote:

> Hi all,
>
> I'm trying to understand the impact of coalesce operation on spark job
> performance.
>
> As a side note: were are using emrfs (i.e. aws s3) as source and a target
> for the job.
>
> Omitting unnecessary details job can be explained as: join 200M records
> Dataframe stored in orc format on emrfs with another 200M records cached
> Dataframe, the result of the join put back to emrfs. First DF is a set of
> wide rows (Spark UI shows 300 GB) and the second is relatively small (Spark
> shows 20 GB).
>
> I have enough resources in my cluster to perform the job but I don't like
> the fact that output datasource contains 200 part orc files (as
> spark.sql.shuffle.partitions defaults to 200) so before saving orc to
> emrfs I'm doing .coalesce(10). From documentation coalesce looks like a
> quite harmless operations: no repartitioning etc.
>
> But with such setup my job fails to write dataset on the last stage. Right
> now the error is OOM: GC overhead. When I change  .coalesce(10) to
> .coalesce(100) the job runs much faster and finishes without errors.
>
> So what's the impact of .coalesce in this case? And how to do in place
> concatenation of files (not involving hive) to end up with smaller amount
> of bigger files, as with .coalesce(100) job generates 100 orc snappy
> encoded files ~300MB each.
>
> Thanks,
> Andrii
>