You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Koert Kuipers <ko...@tresata.com> on 2020/08/20 16:11:35 UTC

AQE effectiveness

we tend to have spark.sql.shuffle.partitions set very high by default
simply because some jobs need it to be high and it's easier to then just
set the default high instead of having people tune it manually per job. the
main downside is lots of part files which leads to pressure on the driver,
and dynamic allocation becomes troublesome if every aggregation requires
thousands of tasks... even the simplest aggregation on tiny small data will
demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling
the reducer partitions we avoid these issues. so we have AQE turned on by
default. every once in a while i scan through our spark AMs and logs to see
how it's doing. i mostly look for stages that have a number of tasks equal
to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
effective. unfortunately this seems to be the majority. i suspect it has to
do with caching/persisting which we use frequently. a simple reproduction
is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header",
true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g.
spark.sql.shuffle.partitions) for certain jobs. the only difference is when
persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header",
true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)

Re: AQE effectiveness

Posted by Koert Kuipers <ko...@tresata.com>.
i have been doing tests with iterative algorithms that do caching/uncaching
at each iteration and i see improvements when i turn on AQE for cache.

now i am wondering... with an iterative algo using AQE it is true that the
output of every iteration can have a slightly different number of
partitions (which i observed), and this introduces extra shuffles in joints
between these outputs. so far this downside seems outweighed by the upsides
of AQE, so its not a big deal. but it would be rather simple to remove many
unnecessary shuffles and still retain all the benefits of AQE if we just
only slightly restricted AQEs choices for the number of partitions. for
example we could force it to use a power of 2? this way you get the
benefits of AQE but you dont have a frustrating situation where one
iteration has 67 partitions and the next has 68 partitions and then a join
introduces a shuffle.





On Fri, Aug 21, 2020 at 12:10 PM Maryann Xue <ma...@databricks.com>
wrote:

> It would break CachedTableSuite."A cached table preserves the partitioning
> and ordering of its cached SparkPlan" if AQE was turned on.
>
> Anyway, the chance of this outputPartitioning being useful is rather low
> and should not justify turning off AQE for SQL cache.
>
> On Thu, Aug 20, 2020 at 10:54 PM Koert Kuipers <ko...@tresata.com> wrote:
>
>> in our inhouse spark version i changed this without trouble and it didnt
>> even break any tests
>> just some minor changes in CacheManager it seems
>>
>> On Thu, Aug 20, 2020 at 1:12 PM Maryann Xue <ma...@databricks.com>
>> wrote:
>>
>>> No. The worst case of enabling AQE in cached data is not losing the
>>> opportunity of using/reusing the cache, but rather just an extra shuffle if
>>> the outputPartitioning happens to match without AQE and not match after
>>> AQE. The chance of this happening is rather low.
>>>
>>> On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers <ko...@tresata.com>
>>> wrote:
>>>
>>>> i see. it makes sense to maximize re-use of cached data. i didn't
>>>> realize we have two potentially conflicting goals here.
>>>>
>>>>
>>>> On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <
>>>> maryann.xue@databricks.com> wrote:
>>>>
>>>>> AQE has been turned off deliberately so that the `outputPartitioning`
>>>>> of the cached relation won't be changed by AQE partition coalescing or skew
>>>>> join optimization and the outputPartitioning can potentially be used by
>>>>> relations built on top of the cache.
>>>>>
>>>>> On a second thought, we should probably add a config there and enable
>>>>> AQE by default.
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Maryann
>>>>>
>>>>> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> we tend to have spark.sql.shuffle.partitions set very high by default
>>>>>> simply because some jobs need it to be high and it's easier to then just
>>>>>> set the default high instead of having people tune it manually per job. the
>>>>>> main downside is lots of part files which leads to pressure on the driver,
>>>>>> and dynamic allocation becomes troublesome if every aggregation requires
>>>>>> thousands of tasks... even the simplest aggregation on tiny small data will
>>>>>> demand all resources on the cluster.
>>>>>>
>>>>>> because of these issues AQE appeals a lot to me: by automatically
>>>>>> scaling the reducer partitions we avoid these issues. so we have AQE turned
>>>>>> on by default. every once in a while i scan through our spark AMs and logs
>>>>>> to see how it's doing. i mostly look for stages that have a number of tasks
>>>>>> equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>>>>>> effective. unfortunately this seems to be the majority. i suspect it has to
>>>>>> do with caching/persisting which we use frequently. a simple reproduction
>>>>>> is below.
>>>>>>
>>>>>> any idea why caching/persisting would interfere with AQE?
>>>>>>
>>>>>> best, koert
>>>>>>
>>>>>> $ hadoop fs -text fruits.csv
>>>>>> fruit,color,quantity
>>>>>> apple,red,5
>>>>>> grape,blue,50
>>>>>> pear,green,3
>>>>>>
>>>>>> # works well using AQE, uses 1 to 3 tasks per job
>>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>>> spark.sql.adaptive.enabled=true
>>>>>> scala> val data = spark.read.format("csv").option("header",
>>>>>> true).load("fruits.csv").persist()
>>>>>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>>>>>
>>>>>> # does not work well using AQR, uses 200 tasks (e.g.
>>>>>> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
>>>>>> persist is called.
>>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>>> spark.sql.adaptive.enabled=true
>>>>>> scala> val data = spark.read.format("csv").option("header",
>>>>>> true).load("fruits.csv").groupBy("fruit").count().persist()
>>>>>> scala> data.write.format("csv").save("out)
>>>>>>
>>>>>>

Re: AQE effectiveness

Posted by Maryann Xue <ma...@databricks.com>.
It would break CachedTableSuite."A cached table preserves the partitioning
and ordering of its cached SparkPlan" if AQE was turned on.

Anyway, the chance of this outputPartitioning being useful is rather low
and should not justify turning off AQE for SQL cache.

On Thu, Aug 20, 2020 at 10:54 PM Koert Kuipers <ko...@tresata.com> wrote:

> in our inhouse spark version i changed this without trouble and it didnt
> even break any tests
> just some minor changes in CacheManager it seems
>
> On Thu, Aug 20, 2020 at 1:12 PM Maryann Xue <ma...@databricks.com>
> wrote:
>
>> No. The worst case of enabling AQE in cached data is not losing the
>> opportunity of using/reusing the cache, but rather just an extra shuffle if
>> the outputPartitioning happens to match without AQE and not match after
>> AQE. The chance of this happening is rather low.
>>
>> On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> i see. it makes sense to maximize re-use of cached data. i didn't
>>> realize we have two potentially conflicting goals here.
>>>
>>>
>>> On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <ma...@databricks.com>
>>> wrote:
>>>
>>>> AQE has been turned off deliberately so that the `outputPartitioning`
>>>> of the cached relation won't be changed by AQE partition coalescing or skew
>>>> join optimization and the outputPartitioning can potentially be used by
>>>> relations built on top of the cache.
>>>>
>>>> On a second thought, we should probably add a config there and enable
>>>> AQE by default.
>>>>
>>>>
>>>> Thanks,
>>>> Maryann
>>>>
>>>> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> we tend to have spark.sql.shuffle.partitions set very high by default
>>>>> simply because some jobs need it to be high and it's easier to then just
>>>>> set the default high instead of having people tune it manually per job. the
>>>>> main downside is lots of part files which leads to pressure on the driver,
>>>>> and dynamic allocation becomes troublesome if every aggregation requires
>>>>> thousands of tasks... even the simplest aggregation on tiny small data will
>>>>> demand all resources on the cluster.
>>>>>
>>>>> because of these issues AQE appeals a lot to me: by automatically
>>>>> scaling the reducer partitions we avoid these issues. so we have AQE turned
>>>>> on by default. every once in a while i scan through our spark AMs and logs
>>>>> to see how it's doing. i mostly look for stages that have a number of tasks
>>>>> equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>>>>> effective. unfortunately this seems to be the majority. i suspect it has to
>>>>> do with caching/persisting which we use frequently. a simple reproduction
>>>>> is below.
>>>>>
>>>>> any idea why caching/persisting would interfere with AQE?
>>>>>
>>>>> best, koert
>>>>>
>>>>> $ hadoop fs -text fruits.csv
>>>>> fruit,color,quantity
>>>>> apple,red,5
>>>>> grape,blue,50
>>>>> pear,green,3
>>>>>
>>>>> # works well using AQE, uses 1 to 3 tasks per job
>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>> spark.sql.adaptive.enabled=true
>>>>> scala> val data = spark.read.format("csv").option("header",
>>>>> true).load("fruits.csv").persist()
>>>>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>>>>
>>>>> # does not work well using AQR, uses 200 tasks (e.g.
>>>>> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
>>>>> persist is called.
>>>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>>>> spark.sql.adaptive.enabled=true
>>>>> scala> val data = spark.read.format("csv").option("header",
>>>>> true).load("fruits.csv").groupBy("fruit").count().persist()
>>>>> scala> data.write.format("csv").save("out)
>>>>>
>>>>>

Re: AQE effectiveness

Posted by Maryann Xue <ma...@databricks.com>.
No. The worst case of enabling AQE in cached data is not losing the
opportunity of using/reusing the cache, but rather just an extra shuffle if
the outputPartitioning happens to match without AQE and not match after
AQE. The chance of this happening is rather low.

On Thu, Aug 20, 2020 at 12:09 PM Koert Kuipers <ko...@tresata.com> wrote:

> i see. it makes sense to maximize re-use of cached data. i didn't realize
> we have two potentially conflicting goals here.
>
>
> On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <ma...@databricks.com>
> wrote:
>
>> AQE has been turned off deliberately so that the `outputPartitioning` of
>> the cached relation won't be changed by AQE partition coalescing or skew
>> join optimization and the outputPartitioning can potentially be used by
>> relations built on top of the cache.
>>
>> On a second thought, we should probably add a config there and enable AQE
>> by default.
>>
>>
>> Thanks,
>> Maryann
>>
>> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> we tend to have spark.sql.shuffle.partitions set very high by default
>>> simply because some jobs need it to be high and it's easier to then just
>>> set the default high instead of having people tune it manually per job. the
>>> main downside is lots of part files which leads to pressure on the driver,
>>> and dynamic allocation becomes troublesome if every aggregation requires
>>> thousands of tasks... even the simplest aggregation on tiny small data will
>>> demand all resources on the cluster.
>>>
>>> because of these issues AQE appeals a lot to me: by automatically
>>> scaling the reducer partitions we avoid these issues. so we have AQE turned
>>> on by default. every once in a while i scan through our spark AMs and logs
>>> to see how it's doing. i mostly look for stages that have a number of tasks
>>> equal to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>>> effective. unfortunately this seems to be the majority. i suspect it has to
>>> do with caching/persisting which we use frequently. a simple reproduction
>>> is below.
>>>
>>> any idea why caching/persisting would interfere with AQE?
>>>
>>> best, koert
>>>
>>> $ hadoop fs -text fruits.csv
>>> fruit,color,quantity
>>> apple,red,5
>>> grape,blue,50
>>> pear,green,3
>>>
>>> # works well using AQE, uses 1 to 3 tasks per job
>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>> spark.sql.adaptive.enabled=true
>>> scala> val data = spark.read.format("csv").option("header",
>>> true).load("fruits.csv").persist()
>>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>>
>>> # does not work well using AQR, uses 200 tasks (e.g.
>>> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
>>> persist is called.
>>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>>> spark.sql.adaptive.enabled=true
>>> scala> val data = spark.read.format("csv").option("header",
>>> true).load("fruits.csv").groupBy("fruit").count().persist()
>>> scala> data.write.format("csv").save("out)
>>>
>>>

Re: AQE effectiveness

Posted by Koert Kuipers <ko...@tresata.com>.
i see. it makes sense to maximize re-use of cached data. i didn't realize
we have two potentially conflicting goals here.


On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <ma...@databricks.com>
wrote:

> AQE has been turned off deliberately so that the `outputPartitioning` of
> the cached relation won't be changed by AQE partition coalescing or skew
> join optimization and the outputPartitioning can potentially be used by
> relations built on top of the cache.
>
> On a second thought, we should probably add a config there and enable AQE
> by default.
>
>
> Thanks,
> Maryann
>
> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote:
>
>> we tend to have spark.sql.shuffle.partitions set very high by default
>> simply because some jobs need it to be high and it's easier to then just
>> set the default high instead of having people tune it manually per job. the
>> main downside is lots of part files which leads to pressure on the driver,
>> and dynamic allocation becomes troublesome if every aggregation requires
>> thousands of tasks... even the simplest aggregation on tiny small data will
>> demand all resources on the cluster.
>>
>> because of these issues AQE appeals a lot to me: by automatically scaling
>> the reducer partitions we avoid these issues. so we have AQE turned on by
>> default. every once in a while i scan through our spark AMs and logs to see
>> how it's doing. i mostly look for stages that have a number of tasks equal
>> to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>> effective. unfortunately this seems to be the majority. i suspect it has to
>> do with caching/persisting which we use frequently. a simple reproduction
>> is below.
>>
>> any idea why caching/persisting would interfere with AQE?
>>
>> best, koert
>>
>> $ hadoop fs -text fruits.csv
>> fruit,color,quantity
>> apple,red,5
>> grape,blue,50
>> pear,green,3
>>
>> # works well using AQE, uses 1 to 3 tasks per job
>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>> spark.sql.adaptive.enabled=true
>> scala> val data = spark.read.format("csv").option("header",
>> true).load("fruits.csv").persist()
>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>
>> # does not work well using AQR, uses 200 tasks (e.g.
>> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
>> persist is called.
>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>> spark.sql.adaptive.enabled=true
>> scala> val data = spark.read.format("csv").option("header",
>> true).load("fruits.csv").groupBy("fruit").count().persist()
>> scala> data.write.format("csv").save("out)
>>
>>

Re: AQE effectiveness

Posted by Maryann Xue <ma...@databricks.com>.
AQE has been turned off deliberately so that the `outputPartitioning` of
the cached relation won't be changed by AQE partition coalescing or skew
join optimization and the outputPartitioning can potentially be used by
relations built on top of the cache.

On a second thought, we should probably add a config there and enable AQE
by default.


Thanks,
Maryann

On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote:

> we tend to have spark.sql.shuffle.partitions set very high by default
> simply because some jobs need it to be high and it's easier to then just
> set the default high instead of having people tune it manually per job. the
> main downside is lots of part files which leads to pressure on the driver,
> and dynamic allocation becomes troublesome if every aggregation requires
> thousands of tasks... even the simplest aggregation on tiny small data will
> demand all resources on the cluster.
>
> because of these issues AQE appeals a lot to me: by automatically scaling
> the reducer partitions we avoid these issues. so we have AQE turned on by
> default. every once in a while i scan through our spark AMs and logs to see
> how it's doing. i mostly look for stages that have a number of tasks equal
> to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
> effective. unfortunately this seems to be the majority. i suspect it has to
> do with caching/persisting which we use frequently. a simple reproduction
> is below.
>
> any idea why caching/persisting would interfere with AQE?
>
> best, koert
>
> $ hadoop fs -text fruits.csv
> fruit,color,quantity
> apple,red,5
> grape,blue,50
> pear,green,3
>
> # works well using AQE, uses 1 to 3 tasks per job
> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
> spark.sql.adaptive.enabled=true
> scala> val data = spark.read.format("csv").option("header",
> true).load("fruits.csv").persist()
> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>
> # does not work well using AQR, uses 200 tasks (e.g.
> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
> persist is called.
> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
> spark.sql.adaptive.enabled=true
> scala> val data = spark.read.format("csv").option("header",
> true).load("fruits.csv").groupBy("fruit").count().persist()
> scala> data.write.format("csv").save("out)
>
>