You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Koert Kuipers <ko...@tresata.com> on 2018/08/08 19:39:04 UTC

groupBy and then coalesce impacts shuffle partitions in unintended way

hi,

i am reading data from files into a dataframe, then doing a groupBy for a
given column with a count, and finally i coalesce to a smaller number of
partitions before writing out to disk. so roughly:

spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)

i have this setting: spark.sql.shuffle.partitions=2048

i expect to see 2048 partitions in shuffle. what i am seeing instead is a
shuffle with only 100 partitions. it's like the coalesce has taken over the
partitioning of the groupBy.

any idea why?

i am doing coalesce because it is not helpful to write out 2048 files, lots
of files leads to pressure down the line on executors reading this data (i
am writing to just one partition of a larger dataset), and since i have
less than 100 executors i expect it to be efficient. so sounds like a good
idea, no?

but i do need 2048 partitions in my shuffle due to the operation i am doing
in the groupBy (in my real problem i am not just doing a count...).

thanks!
koert

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
thanks for that long reply jungtaek!

so when i set spark.sql.shuffle.partitions to 2048 i have 2048 data
partitions (or "partitions of state"). these are determined by a hashing
function. ok got it!

when i look at the application manager i also see 2048 "tasks" for the
relevant stage. so tasks here is not the same as parallelism, which is
defined by number of executors * number of cores. and i see these tasks are
being queued up to be processed. i have learned to watch the number of
tasks in shuffle carefully, since its the unit of work, and because when i
have memory issues (OOM) it usually means i need to split the data up more,
so increase the tasks.

so is it reasonable to assume for the shuffle that a task maps to a single
data partition being processed?

if so, then when i do a coalesce(100) after a shuffle i see only 100 tasks
for the stage of shuffle. what does this mean? does this mean a task no
longer maps to a single data partition being processed, and i still have
2048 data partitions? if so, does every task process multiple data
partitions sequentially? and does this not increase my chances of OOM
because the data partitions are processed sequentially within a task?


On Thu, Aug 9, 2018 at 3:10 AM, Jungtaek Lim <ka...@gmail.com> wrote:

> I could be wrong so welcome anyone to correct me if I'm missing here.
>
> You can expect Spark operators in narrow dependency as applying wrapped
> functions to an iterator (like "op3(op2(op1(iter)))"), and with such
> expectation there's no way to achieve adjusting partitions. Each partition
> is independent from others and there's no communication between tasks.
>
> So if you have 1000 partitions (in terms of parallelism, not data
> partitions) and willing to reduce down (or scale out) to some arbitrary
> number of partitions, it would require moving of data and requires shuffle.
>
> The meaning of "spark.sql.shuffle.partitions" is especially important for
> structured streaming because it defines data partitions of state. For
> structured streaming, there're couple of operations which leverage state
> which is stored to the file system. The state is partitioned by key
> columns, and "spark.sql.shuffle.partitions" data partitions are generated.
> Due to the nature of hash function, once you run the streaming query,
> "spark.sql.shuffle.partitions" keeps unchanged (Spark doesn't allow
> reconfigure for the config).
>
> So the value of configuration represents data partitions of state, as well
> as max parallelism of stateful operators. If we want to have less
> parallelism (not same as number of partitions), we should apply coalesce to
> the operator and the number of partitions are still kept unchanged whereas
> it incurs less parallelism and also less tasks.
>
> We just can't apply coalesce to individual operator in narrow dependency.
>
> -Jungtaek Lim (HeartSaVioR)
> 2018년 8월 9일 (목) 오후 3:07, Koert Kuipers <ko...@tresata.com>님이 작성:
>
>> well an interesting side effect of this is that i can now control the
>> number of partitions for every shuffle in a dataframe job, as opposed to
>> having a single setting for number of partitions across all shuffles.
>>
>> basically i can set spark.sql.shuffle.partitions to some huge number, and
>> then for every groupByKey (or any other shuffle operation) follow it up
>> with a coalesce to set the number of partitions. its like i have
>> numPartitions back from those good old RDD shuffle methods :)
>>
>>
>> On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> an new map task after a shuffle is also a narrow dependency, isnt it?
>>> its narrow because data doesn't need to move, e.g. every partition depends
>>> on single partition, preferably on same machine.
>>>
>>> modifying a previous shuffle to avoid a shuffle strikes me as odd, and
>>> can potentially make a mess of performance, especially when no shuffle is
>>> needed. just a new map task.
>>>
>>>
>>> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <ka...@gmail.com> wrote:
>>>
>>>> > shouldnt coalesce introduce a new map-phase with less tasks instead
>>>> of changing the previous shuffle?
>>>>
>>>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>>>> results in narrow dependency, hence no shuffle.
>>>>
>>>> So it is pretty clear that you need to use "repartition". Not sure
>>>> there's any available trick to achieve it without calling repartition.
>>>>
>>>> Thanks,
>>>> Jungtaek Lim (HeartSaVioR)
>>>>
>>>> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
>>>> 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/
>>>> Dataset.scala#L2918-L2937
>>>>
>>>>
>>>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <ko...@tresata.com>님이 작성:
>>>>
>>>>> sorry i meant to say:
>>>>> wit a checkpoint i get a map phase with lots of tasks to read the
>>>>> data, then a reduce phase with 2048 reducers, and then finally a map phase
>>>>> with 100 tasks.
>>>>>
>>>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> the only thing that seems to stop this so far is a checkpoint.
>>>>>>
>>>>>> wit a checkpoint i get a map phase with lots of tasks to read the
>>>>>> data, then a reduce phase with 2048 reducers, and then finally a map phase
>>>>>> with 4 tasks.
>>>>>>
>>>>>> now i need to figure out how to do this without having to checkpoint.
>>>>>> i wish i could insert something like a dummy operation that logical steps
>>>>>> cannot jump over.
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com>
>>>>>> wrote:
>>>>>>
>>>>>>> ok thanks.
>>>>>>>
>>>>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>>>>> with less tasks instead of changing the previous shuffle?
>>>>>>>
>>>>>>> using repartition seems too expensive just to keep the number of
>>>>>>> files down. so i guess i am back to looking for another solution.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>>>>> have to use `repartition` instead which is going to introduce an
>>>>>>>> extra
>>>>>>>> shuffle stage
>>>>>>>>
>>>>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > one small correction: lots of files leads to pressure on the
>>>>>>>> spark driver program when reading this data in spark.
>>>>>>>> >
>>>>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>>>>>>>> wrote:
>>>>>>>> >>
>>>>>>>> >> hi,
>>>>>>>> >>
>>>>>>>> >> i am reading data from files into a dataframe, then doing a
>>>>>>>> groupBy for a given column with a count, and finally i coalesce to a
>>>>>>>> smaller number of partitions before writing out to disk. so roughly:
>>>>>>>> >>
>>>>>>>> >> spark.read.format(...).load(...).groupBy(column).count().
>>>>>>>> coalesce(100).write.format(...).save(...)
>>>>>>>> >>
>>>>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>>>>> >>
>>>>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>>>>>>>> taken over the partitioning of the groupBy.
>>>>>>>> >>
>>>>>>>> >> any idea why?
>>>>>>>> >>
>>>>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>>>>>> like a good idea, no?
>>>>>>>> >>
>>>>>>>> >> but i do need 2048 partitions in my shuffle due to the operation
>>>>>>>> i am doing in the groupBy (in my real problem i am not just doing a
>>>>>>>> count...).
>>>>>>>> >>
>>>>>>>> >> thanks!
>>>>>>>> >> koert
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from my iPhone
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Jungtaek Lim <ka...@gmail.com>.
I could be wrong so welcome anyone to correct me if I'm missing here.

You can expect Spark operators in narrow dependency as applying wrapped
functions to an iterator (like "op3(op2(op1(iter)))"), and with such
expectation there's no way to achieve adjusting partitions. Each partition
is independent from others and there's no communication between tasks.

So if you have 1000 partitions (in terms of parallelism, not data
partitions) and willing to reduce down (or scale out) to some arbitrary
number of partitions, it would require moving of data and requires shuffle.

The meaning of "spark.sql.shuffle.partitions" is especially important for
structured streaming because it defines data partitions of state. For
structured streaming, there're couple of operations which leverage state
which is stored to the file system. The state is partitioned by key
columns, and "spark.sql.shuffle.partitions" data partitions are generated.
Due to the nature of hash function, once you run the streaming query,
"spark.sql.shuffle.partitions" keeps unchanged (Spark doesn't allow
reconfigure for the config).

So the value of configuration represents data partitions of state, as well
as max parallelism of stateful operators. If we want to have less
parallelism (not same as number of partitions), we should apply coalesce to
the operator and the number of partitions are still kept unchanged whereas
it incurs less parallelism and also less tasks.

We just can't apply coalesce to individual operator in narrow dependency.

-Jungtaek Lim (HeartSaVioR)
2018년 8월 9일 (목) 오후 3:07, Koert Kuipers <ko...@tresata.com>님이 작성:

> well an interesting side effect of this is that i can now control the
> number of partitions for every shuffle in a dataframe job, as opposed to
> having a single setting for number of partitions across all shuffles.
>
> basically i can set spark.sql.shuffle.partitions to some huge number, and
> then for every groupByKey (or any other shuffle operation) follow it up
> with a coalesce to set the number of partitions. its like i have
> numPartitions back from those good old RDD shuffle methods :)
>
>
> On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> an new map task after a shuffle is also a narrow dependency, isnt it? its
>> narrow because data doesn't need to move, e.g. every partition depends on
>> single partition, preferably on same machine.
>>
>> modifying a previous shuffle to avoid a shuffle strikes me as odd, and
>> can potentially make a mess of performance, especially when no shuffle is
>> needed. just a new map task.
>>
>>
>> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <ka...@gmail.com> wrote:
>>
>>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>>> changing the previous shuffle?
>>>
>>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>>> results in narrow dependency, hence no shuffle.
>>>
>>> So it is pretty clear that you need to use "repartition". Not sure
>>> there's any available trick to achieve it without calling repartition.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 1.
>>> https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937
>>>
>>>
>>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <ko...@tresata.com>님이 작성:
>>>
>>>> sorry i meant to say:
>>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>>> then a reduce phase with 2048 reducers, and then finally a map phase with
>>>> 100 tasks.
>>>>
>>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> the only thing that seems to stop this so far is a checkpoint.
>>>>>
>>>>> wit a checkpoint i get a map phase with lots of tasks to read the
>>>>> data, then a reduce phase with 2048 reducers, and then finally a map phase
>>>>> with 4 tasks.
>>>>>
>>>>> now i need to figure out how to do this without having to checkpoint.
>>>>> i wish i could insert something like a dummy operation that logical steps
>>>>> cannot jump over.
>>>>>
>>>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> ok thanks.
>>>>>>
>>>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>>>> with less tasks instead of changing the previous shuffle?
>>>>>>
>>>>>> using repartition seems too expensive just to keep the number of
>>>>>> files down. so i guess i am back to looking for another solution.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com>
>>>>>> wrote:
>>>>>>
>>>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>>>> have to use `repartition` instead which is going to introduce an
>>>>>>> extra
>>>>>>> shuffle stage
>>>>>>>
>>>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > one small correction: lots of files leads to pressure on the spark
>>>>>>> driver program when reading this data in spark.
>>>>>>> >
>>>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> hi,
>>>>>>> >>
>>>>>>> >> i am reading data from files into a dataframe, then doing a
>>>>>>> groupBy for a given column with a count, and finally i coalesce to a
>>>>>>> smaller number of partitions before writing out to disk. so roughly:
>>>>>>> >>
>>>>>>> >>
>>>>>>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>>>>>> >>
>>>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>>>> >>
>>>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>>>>>>> taken over the partitioning of the groupBy.
>>>>>>> >>
>>>>>>> >> any idea why?
>>>>>>> >>
>>>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>>>>> like a good idea, no?
>>>>>>> >>
>>>>>>> >> but i do need 2048 partitions in my shuffle due to the operation
>>>>>>> i am doing in the groupBy (in my real problem i am not just doing a
>>>>>>> count...).
>>>>>>> >>
>>>>>>> >> thanks!
>>>>>>> >> koert
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from my iPhone
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
well an interesting side effect of this is that i can now control the
number of partitions for every shuffle in a dataframe job, as opposed to
having a single setting for number of partitions across all shuffles.

basically i can set spark.sql.shuffle.partitions to some huge number, and
then for every groupByKey (or any other shuffle operation) follow it up
with a coalesce to set the number of partitions. its like i have
numPartitions back from those good old RDD shuffle methods :)


On Thu, Aug 9, 2018 at 1:38 AM, Koert Kuipers <ko...@tresata.com> wrote:

> an new map task after a shuffle is also a narrow dependency, isnt it? its
> narrow because data doesn't need to move, e.g. every partition depends on
> single partition, preferably on same machine.
>
> modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
> potentially make a mess of performance, especially when no shuffle is
> needed. just a new map task.
>
>
> On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <ka...@gmail.com> wrote:
>
>> > shouldnt coalesce introduce a new map-phase with less tasks instead of
>> changing the previous shuffle?
>>
>> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
>> results in narrow dependency, hence no shuffle.
>>
>> So it is pretty clear that you need to use "repartition". Not sure
>> there's any available trick to achieve it without calling repartition.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a03081
>> 65f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/
>> spark/sql/Dataset.scala#L2918-L2937
>>
>>
>> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <ko...@tresata.com>님이 작성:
>>
>>> sorry i meant to say:
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with
>>> 100 tasks.
>>>
>>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> the only thing that seems to stop this so far is a checkpoint.
>>>>
>>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>>>> tasks.
>>>>
>>>> now i need to figure out how to do this without having to checkpoint. i
>>>> wish i could insert something like a dummy operation that logical steps
>>>> cannot jump over.
>>>>
>>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> ok thanks.
>>>>>
>>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>>> with less tasks instead of changing the previous shuffle?
>>>>>
>>>>> using repartition seems too expensive just to keep the number of files
>>>>> down. so i guess i am back to looking for another solution.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com>
>>>>> wrote:
>>>>>
>>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>>> have to use `repartition` instead which is going to introduce an extra
>>>>>> shuffle stage
>>>>>>
>>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > one small correction: lots of files leads to pressure on the spark
>>>>>> driver program when reading this data in spark.
>>>>>> >
>>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> hi,
>>>>>> >>
>>>>>> >> i am reading data from files into a dataframe, then doing a
>>>>>> groupBy for a given column with a count, and finally i coalesce to a
>>>>>> smaller number of partitions before writing out to disk. so roughly:
>>>>>> >>
>>>>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>>>>> lesce(100).write.format(...).save(...)
>>>>>> >>
>>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>>> >>
>>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>>>>>> taken over the partitioning of the groupBy.
>>>>>> >>
>>>>>> >> any idea why?
>>>>>> >>
>>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>>>> like a good idea, no?
>>>>>> >>
>>>>>> >> but i do need 2048 partitions in my shuffle due to the operation i
>>>>>> am doing in the groupBy (in my real problem i am not just doing a count...).
>>>>>> >>
>>>>>> >> thanks!
>>>>>> >> koert
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from my iPhone
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
an new map task after a shuffle is also a narrow dependency, isnt it? its
narrow because data doesn't need to move, e.g. every partition depends on
single partition, preferably on same machine.

modifying a previous shuffle to avoid a shuffle strikes me as odd, and can
potentially make a mess of performance, especially when no shuffle is
needed. just a new map task.


On Thu, Aug 9, 2018 at 1:15 AM, Jungtaek Lim <ka...@gmail.com> wrote:

> > shouldnt coalesce introduce a new map-phase with less tasks instead of
> changing the previous shuffle?
>
> The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
> results in narrow dependency, hence no shuffle.
>
> So it is pretty clear that you need to use "repartition". Not sure there's
> any available trick to achieve it without calling repartition.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 1. https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e
> 9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/
> Dataset.scala#L2918-L2937
>
>
> 2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <ko...@tresata.com>님이 작성:
>
>> sorry i meant to say:
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with
>> 100 tasks.
>>
>> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> the only thing that seems to stop this so far is a checkpoint.
>>>
>>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>>> tasks.
>>>
>>> now i need to figure out how to do this without having to checkpoint. i
>>> wish i could insert something like a dummy operation that logical steps
>>> cannot jump over.
>>>
>>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> ok thanks.
>>>>
>>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase
>>>> with less tasks instead of changing the previous shuffle?
>>>>
>>>> using repartition seems too expensive just to keep the number of files
>>>> down. so i guess i am back to looking for another solution.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com>
>>>> wrote:
>>>>
>>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>>> have to use `repartition` instead which is going to introduce an extra
>>>>> shuffle stage
>>>>>
>>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>> >
>>>>> > one small correction: lots of files leads to pressure on the spark
>>>>> driver program when reading this data in spark.
>>>>> >
>>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>> >>
>>>>> >> hi,
>>>>> >>
>>>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>>>> for a given column with a count, and finally i coalesce to a smaller number
>>>>> of partitions before writing out to disk. so roughly:
>>>>> >>
>>>>> >> spark.read.format(...).load(...).groupBy(column).count().
>>>>> coalesce(100).write.format(...).save(...)
>>>>> >>
>>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>>> >>
>>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing
>>>>> instead is a shuffle with only 100 partitions. it's like the coalesce has
>>>>> taken over the partitioning of the groupBy.
>>>>> >>
>>>>> >> any idea why?
>>>>> >>
>>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>>> files, lots of files leads to pressure down the line on executors reading
>>>>> this data (i am writing to just one partition of a larger dataset), and
>>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>>> like a good idea, no?
>>>>> >>
>>>>> >> but i do need 2048 partitions in my shuffle due to the operation i
>>>>> am doing in the groupBy (in my real problem i am not just doing a count...).
>>>>> >>
>>>>> >> thanks!
>>>>> >> koert
>>>>> >>
>>>>> >
>>>>>
>>>>>
>>>>> --
>>>>> Sent from my iPhone
>>>>>
>>>>
>>>>
>>>
>>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Jungtaek Lim <ka...@gmail.com>.
> shouldnt coalesce introduce a new map-phase with less tasks instead of
changing the previous shuffle?

The javadoc of Dataset.coalesce [1] describes such behavior clearly. It
results in narrow dependency, hence no shuffle.

So it is pretty clear that you need to use "repartition". Not sure there's
any available trick to achieve it without calling repartition.

Thanks,
Jungtaek Lim (HeartSaVioR)

1.
https://github.com/apache/spark/blob/a40806d2bd84e9a0308165f0d6c97e9cf00aa4a3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2918-L2937


2018년 8월 9일 (목) 오전 5:55, Koert Kuipers <ko...@tresata.com>님이 작성:

> sorry i meant to say:
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with
> 100 tasks.
>
> On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> the only thing that seems to stop this so far is a checkpoint.
>>
>> wit a checkpoint i get a map phase with lots of tasks to read the data,
>> then a reduce phase with 2048 reducers, and then finally a map phase with 4
>> tasks.
>>
>> now i need to figure out how to do this without having to checkpoint. i
>> wish i could insert something like a dummy operation that logical steps
>> cannot jump over.
>>
>> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> ok thanks.
>>>
>>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase with
>>> less tasks instead of changing the previous shuffle?
>>>
>>> using repartition seems too expensive just to keep the number of files
>>> down. so i guess i am back to looking for another solution.
>>>
>>>
>>>
>>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com>
>>> wrote:
>>>
>>>> `coalesce` sets the number of partitions for the last stage, so you
>>>> have to use `repartition` instead which is going to introduce an extra
>>>> shuffle stage
>>>>
>>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com> wrote:
>>>> >
>>>> > one small correction: lots of files leads to pressure on the spark
>>>> driver program when reading this data in spark.
>>>> >
>>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>> >>
>>>> >> hi,
>>>> >>
>>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>>> for a given column with a count, and finally i coalesce to a smaller number
>>>> of partitions before writing out to disk. so roughly:
>>>> >>
>>>> >>
>>>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>>> >>
>>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>>> >>
>>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>>> over the partitioning of the groupBy.
>>>> >>
>>>> >> any idea why?
>>>> >>
>>>> >> i am doing coalesce because it is not helpful to write out 2048
>>>> files, lots of files leads to pressure down the line on executors reading
>>>> this data (i am writing to just one partition of a larger dataset), and
>>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>>> like a good idea, no?
>>>> >>
>>>> >> but i do need 2048 partitions in my shuffle due to the operation i
>>>> am doing in the groupBy (in my real problem i am not just doing a count...).
>>>> >>
>>>> >> thanks!
>>>> >> koert
>>>> >>
>>>> >
>>>>
>>>>
>>>> --
>>>> Sent from my iPhone
>>>>
>>>
>>>
>>
>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
sorry i meant to say:
wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with
100 tasks.

On Wed, Aug 8, 2018 at 4:54 PM, Koert Kuipers <ko...@tresata.com> wrote:

> the only thing that seems to stop this so far is a checkpoint.
>
> wit a checkpoint i get a map phase with lots of tasks to read the data,
> then a reduce phase with 2048 reducers, and then finally a map phase with 4
> tasks.
>
> now i need to figure out how to do this without having to checkpoint. i
> wish i could insert something like a dummy operation that logical steps
> cannot jump over.
>
> On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> ok thanks.
>>
>> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase with
>> less tasks instead of changing the previous shuffle?
>>
>> using repartition seems too expensive just to keep the number of files
>> down. so i guess i am back to looking for another solution.
>>
>>
>>
>> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com>
>> wrote:
>>
>>> `coalesce` sets the number of partitions for the last stage, so you
>>> have to use `repartition` instead which is going to introduce an extra
>>> shuffle stage
>>>
>>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com> wrote:
>>> >
>>> > one small correction: lots of files leads to pressure on the spark
>>> driver program when reading this data in spark.
>>> >
>>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>> >>
>>> >> hi,
>>> >>
>>> >> i am reading data from files into a dataframe, then doing a groupBy
>>> for a given column with a count, and finally i coalesce to a smaller number
>>> of partitions before writing out to disk. so roughly:
>>> >>
>>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>>> lesce(100).write.format(...).save(...)
>>> >>
>>> >> i have this setting: spark.sql.shuffle.partitions=2048
>>> >>
>>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>>> over the partitioning of the groupBy.
>>> >>
>>> >> any idea why?
>>> >>
>>> >> i am doing coalesce because it is not helpful to write out 2048
>>> files, lots of files leads to pressure down the line on executors reading
>>> this data (i am writing to just one partition of a larger dataset), and
>>> since i have less than 100 executors i expect it to be efficient. so sounds
>>> like a good idea, no?
>>> >>
>>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>>> doing in the groupBy (in my real problem i am not just doing a count...).
>>> >>
>>> >> thanks!
>>> >> koert
>>> >>
>>> >
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>>
>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
the only thing that seems to stop this so far is a checkpoint.

wit a checkpoint i get a map phase with lots of tasks to read the data,
then a reduce phase with 2048 reducers, and then finally a map phase with 4
tasks.

now i need to figure out how to do this without having to checkpoint. i
wish i could insert something like a dummy operation that logical steps
cannot jump over.

On Wed, Aug 8, 2018 at 4:22 PM, Koert Kuipers <ko...@tresata.com> wrote:

> ok thanks.
>
> mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase with
> less tasks instead of changing the previous shuffle?
>
> using repartition seems too expensive just to keep the number of files
> down. so i guess i am back to looking for another solution.
>
>
>
> On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com> wrote:
>
>> `coalesce` sets the number of partitions for the last stage, so you
>> have to use `repartition` instead which is going to introduce an extra
>> shuffle stage
>>
>> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com> wrote:
>> >
>> > one small correction: lots of files leads to pressure on the spark
>> driver program when reading this data in spark.
>> >
>> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com>
>> wrote:
>> >>
>> >> hi,
>> >>
>> >> i am reading data from files into a dataframe, then doing a groupBy
>> for a given column with a count, and finally i coalesce to a smaller number
>> of partitions before writing out to disk. so roughly:
>> >>
>> >> spark.read.format(...).load(...).groupBy(column).count().coa
>> lesce(100).write.format(...).save(...)
>> >>
>> >> i have this setting: spark.sql.shuffle.partitions=2048
>> >>
>> >> i expect to see 2048 partitions in shuffle. what i am seeing instead
>> is a shuffle with only 100 partitions. it's like the coalesce has taken
>> over the partitioning of the groupBy.
>> >>
>> >> any idea why?
>> >>
>> >> i am doing coalesce because it is not helpful to write out 2048 files,
>> lots of files leads to pressure down the line on executors reading this
>> data (i am writing to just one partition of a larger dataset), and since i
>> have less than 100 executors i expect it to be efficient. so sounds like a
>> good idea, no?
>> >>
>> >> but i do need 2048 partitions in my shuffle due to the operation i am
>> doing in the groupBy (in my real problem i am not just doing a count...).
>> >>
>> >> thanks!
>> >> koert
>> >>
>> >
>>
>>
>> --
>> Sent from my iPhone
>>
>
>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
ok thanks.

mhhhhh. that seems odd. shouldnt coalesce introduce a new map-phase with
less tasks instead of changing the previous shuffle?

using repartition seems too expensive just to keep the number of files
down. so i guess i am back to looking for another solution.



On Wed, Aug 8, 2018 at 4:13 PM, Vadim Semenov <va...@datadoghq.com> wrote:

> `coalesce` sets the number of partitions for the last stage, so you
> have to use `repartition` instead which is going to introduce an extra
> shuffle stage
>
> On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com> wrote:
> >
> > one small correction: lots of files leads to pressure on the spark
> driver program when reading this data in spark.
> >
> > On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com> wrote:
> >>
> >> hi,
> >>
> >> i am reading data from files into a dataframe, then doing a groupBy for
> a given column with a count, and finally i coalesce to a smaller number of
> partitions before writing out to disk. so roughly:
> >>
> >> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
> >>
> >> i have this setting: spark.sql.shuffle.partitions=2048
> >>
> >> i expect to see 2048 partitions in shuffle. what i am seeing instead is
> a shuffle with only 100 partitions. it's like the coalesce has taken over
> the partitioning of the groupBy.
> >>
> >> any idea why?
> >>
> >> i am doing coalesce because it is not helpful to write out 2048 files,
> lots of files leads to pressure down the line on executors reading this
> data (i am writing to just one partition of a larger dataset), and since i
> have less than 100 executors i expect it to be efficient. so sounds like a
> good idea, no?
> >>
> >> but i do need 2048 partitions in my shuffle due to the operation i am
> doing in the groupBy (in my real problem i am not just doing a count...).
> >>
> >> thanks!
> >> koert
> >>
> >
>
>
> --
> Sent from my iPhone
>

Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Vadim Semenov <va...@datadoghq.com>.
`coalesce` sets the number of partitions for the last stage, so you
have to use `repartition` instead which is going to introduce an extra
shuffle stage

On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers <ko...@tresata.com> wrote:
>
> one small correction: lots of files leads to pressure on the spark driver program when reading this data in spark.
>
> On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>> hi,
>>
>> i am reading data from files into a dataframe, then doing a groupBy for a given column with a count, and finally i coalesce to a smaller number of partitions before writing out to disk. so roughly:
>>
>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>
>> i have this setting: spark.sql.shuffle.partitions=2048
>>
>> i expect to see 2048 partitions in shuffle. what i am seeing instead is a shuffle with only 100 partitions. it's like the coalesce has taken over the partitioning of the groupBy.
>>
>> any idea why?
>>
>> i am doing coalesce because it is not helpful to write out 2048 files, lots of files leads to pressure down the line on executors reading this data (i am writing to just one partition of a larger dataset), and since i have less than 100 executors i expect it to be efficient. so sounds like a good idea, no?
>>
>> but i do need 2048 partitions in my shuffle due to the operation i am doing in the groupBy (in my real problem i am not just doing a count...).
>>
>> thanks!
>> koert
>>
>


-- 
Sent from my iPhone

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: groupBy and then coalesce impacts shuffle partitions in unintended way

Posted by Koert Kuipers <ko...@tresata.com>.
one small correction: lots of files leads to pressure on the spark driver
program when reading this data in spark.

On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers <ko...@tresata.com> wrote:

> hi,
>
> i am reading data from files into a dataframe, then doing a groupBy for a
> given column with a count, and finally i coalesce to a smaller number of
> partitions before writing out to disk. so roughly:
>
> spark.read.format(...).load(...).groupBy(column).count().
> coalesce(100).write.format(...).save(...)
>
> i have this setting: spark.sql.shuffle.partitions=2048
>
> i expect to see 2048 partitions in shuffle. what i am seeing instead is a
> shuffle with only 100 partitions. it's like the coalesce has taken over the
> partitioning of the groupBy.
>
> any idea why?
>
> i am doing coalesce because it is not helpful to write out 2048 files,
> lots of files leads to pressure down the line on executors reading this
> data (i am writing to just one partition of a larger dataset), and since i
> have less than 100 executors i expect it to be efficient. so sounds like a
> good idea, no?
>
> but i do need 2048 partitions in my shuffle due to the operation i am
> doing in the groupBy (in my real problem i am not just doing a count...).
>
> thanks!
> koert
>
>