You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Matt Cheah <mc...@palantir.com> on 2015/09/22 02:34:39 UTC

DataFrames Aggregate does not spill?

Hi everyone,

I¹m debugging some slowness and apparent memory pressure + GC issues after I
ported some workflows from raw RDDs to Data Frames. In particular, I¹m
looking into an aggregation workflow that computes many aggregations per key
at once.

My workflow before was doing a fairly straightforward combineByKey call
where the aggregation would build up non-trivially sized objects in memory ­
I was computing numerous sums over various fields of the data at a time. In
particular, I noticed that there was spilling to disk on the map side of the
aggregation.

When I switched to using DataFrames aggregation ­ particularly
DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large
number of ³Sum² exprs - the execution began to choke. I saw one of my
executors had a long GC pause and my job isn¹t able to recover. However when
I reduced the number of Sum expressions being computed in the aggregation,
the workflow started to work normally.

I have a hypothesis that I would like to run by everyone. In
org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at
the execution of Aggregation
<https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org
/apache/spark/sql/execution/Aggregate.scala> , which appears to build the
aggregation result in memory via updating a HashMap and iterating over the
rows. However this appears to be less robust than what would happen if
PairRDDFunctions.combineByKey were to be used. If combineByKey were used,
then instead of using two mapPartitions calls (assuming the aggregation is
partially-computable, as Sum is), it would use the ExternalSorter and
ExternalAppendOnlyMap objects to compute the aggregation. This would allow
the aggregated result to grow large as some of the aggregated result could
be spilled to disk. This especially seems bad if the aggregation reduction
factor is low; that is, if there are many unique keys in the dataset. In
particular, the Hash Map is holding O(# of keys * number of aggregated
results per key) items in memory at a time.

I was wondering what everyone¹s thought on this problem is. Did we
consciously think about the robustness implications when choosing to use an
in memory Hash Map to compute the aggregation? Is this an inherent
limitation of the aggregation implementation in Data Frames?

Thanks,

-Matt Cheah








Re: DataFrames Aggregate does not spill?

Posted by Matt Cheah <mc...@palantir.com>.
I was executing on Spark 1.4 so I didn¹t notice the Tungsten option would
make spilling happen in 1.5. I¹ll upgrade to 1.5 and see how that turns out.
Thanks!

From:  Reynold Xin <rx...@databricks.com>
Date:  Monday, September 21, 2015 at 5:36 PM
To:  Matt Cheah <mc...@palantir.com>
Cc:  "dev@spark.apache.org" <de...@spark.apache.org>, Mingyu Kim
<mk...@palantir.com>, Peter Faiman <pe...@palantir.com>
Subject:  Re: DataFrames Aggregate does not spill?

What's the plan if you run explain?

In 1.5 the default should be TungstenAggregate, which does spill (switching
from hash-based aggregation to sort-based aggregation).

On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mc...@palantir.com> wrote:
> Hi everyone,
> 
> I¹m debugging some slowness and apparent memory pressure + GC issues after I
> ported some workflows from raw RDDs to Data Frames. In particular, I¹m looking
> into an aggregation workflow that computes many aggregations per key at once.
> 
> My workflow before was doing a fairly straightforward combineByKey call where
> the aggregation would build up non-trivially sized objects in memory ­ I was
> computing numerous sums over various fields of the data at a time. In
> particular, I noticed that there was spilling to disk on the map side of the
> aggregation.
> 
> When I switched to using DataFrames aggregation ­ particularly
> DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large number
> of ³Sum² exprs - the execution began to choke. I saw one of my executors had a
> long GC pause and my job isn¹t able to recover. However when I reduced the
> number of Sum expressions being computed in the aggregation, the workflow
> started to work normally.
> 
> I have a hypothesis that I would like to run by everyone. In
> org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I¹m looking at the
> execution of Aggregation
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_spark_
> blob_branch-2D1.5_sql_core_src_main_scala_org_apache_spark_sql_execution_Aggre
> gate.scala&d=BQMFaQ&c=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8&r=hzwIMNQ9E9
> 9EMYGuqHI0kXhVbvX3nU3OSDadUnJxjAs&m=MY0QvbkaVGKP6m7L6daL19eak5Q_ByWt_84mRZfff8
> k&s=2f8iTPkA6bsre40-juWK2Q5xA-v_5y6f3ucP4cKKa1s&e=> , which appears to build
> the aggregation result in memory via updating a HashMap and iterating over the
> rows. However this appears to be less robust than what would happen if
> PairRDDFunctions.combineByKey were to be used. If combineByKey were used, then
> instead of using two mapPartitions calls (assuming the aggregation is
> partially-computable, as Sum is), it would use the ExternalSorter and
> ExternalAppendOnlyMap objects to compute the aggregation. This would allow the
> aggregated result to grow large as some of the aggregated result could be
> spilled to disk. This especially seems bad if the aggregation reduction factor
> is low; that is, if there are many unique keys in the dataset. In particular,
> the Hash Map is holding O(# of keys * number of aggregated results per key)
> items in memory at a time.
> 
> I was wondering what everyone¹s thought on this problem is. Did we consciously
> think about the robustness implications when choosing to use an in memory Hash
> Map to compute the aggregation? Is this an inherent limitation of the
> aggregation implementation in Data Frames?
> 
> Thanks,
> 
> -Matt Cheah
> 
> 
> 
> 
> 




Re: DataFrames Aggregate does not spill?

Posted by Reynold Xin <rx...@databricks.com>.
What's the plan if you run explain?

In 1.5 the default should be TungstenAggregate, which does spill (switching
from hash-based aggregation to sort-based aggregation).

On Mon, Sep 21, 2015 at 5:34 PM, Matt Cheah <mc...@palantir.com> wrote:

> Hi everyone,
>
> I’m debugging some slowness and apparent memory pressure + GC issues after
> I ported some workflows from raw RDDs to Data Frames. In particular, I’m
> looking into an aggregation workflow that computes many aggregations per
> key at once.
>
> My workflow before was doing a fairly straightforward combineByKey call
> where the aggregation would build up non-trivially sized objects in memory
> – I was computing numerous sums over various fields of the data at a time.
> In particular, I noticed that there was spilling to disk on the map side of
> the aggregation.
>
> When I switched to using DataFrames aggregation – particularly
> DataFrame.groupBy(some list of keys).agg(exprs) where I passed a large
> number of “Sum” exprs - the execution began to choke. I saw one of my
> executors had a long GC pause and my job isn’t able to recover. However
> when I reduced the number of Sum expressions being computed in the
> aggregation, the workflow started to work normally.
>
> I have a hypothesis that I would like to run by everyone. In
> org.apache.spark.sql.execution.Aggregate.scala, branch-1.5, I’m looking at
> the execution of Aggregation
> <https://github.com/apache/spark/blob/branch-1.5/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala>,
> which appears to build the aggregation result in memory via updating a
> HashMap and iterating over the rows. However this appears to be less robust
> than what would happen if PairRDDFunctions.combineByKey were to be used. If
> combineByKey were used, then instead of using two mapPartitions calls
> (assuming the aggregation is partially-computable, as Sum is), it would use
> the ExternalSorter and ExternalAppendOnlyMap objects to compute the
> aggregation. This would allow the aggregated result to grow large as some
> of the aggregated result could be spilled to disk. This especially seems
> bad if the aggregation reduction factor is low; that is, if there are many
> unique keys in the dataset. In particular, the Hash Map is holding O(# of
> keys * number of aggregated results per key) items in memory at a time.
>
> I was wondering what everyone’s thought on this problem is. Did we
> consciously think about the robustness implications when choosing to use an
> in memory Hash Map to compute the aggregation? Is this an inherent
> limitation of the aggregation implementation in Data Frames?
>
> Thanks,
>
> -Matt Cheah
>
>
>
>
>
>