You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by kaklakariada <ch...@gmail.com> on 2015/09/07 10:02:18 UTC

groupByKey() and keys with many values

Hi,

I already posted this question on the users mailing list
(http://apache-spark-user-list.1001560.n3.nabble.com/Using-groupByKey-with-many-values-per-key-td24538.html)
but did not get a reply. Maybe this is the correct forum to ask.

My problem is, that doing groupByKey().mapToPair() loads all values for a
key into memory which is a problem when the values don't fit into memory.
This was not a problem with Hadoop map/reduce, as the Iterable passed to the
reducer read from disk.

In Spark, the Iterable passed to mapToPair() is backed by a CompactBuffer
containing all values.

Is it possible to change this behavior without modifying Spark, or is there
a plan to change this?

Thank you very much for your help!
Christoph.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: groupByKey() and keys with many values

Posted by Reynold Xin <rx...@databricks.com>.
On Tue, Sep 8, 2015 at 6:51 AM, Antonio Piccolboni <an...@piccolboni.info>
wrote:

> As far as the DB writes,  remember spark can retry a computation, so your
> writes have to be idempotent (see this thread
> <https://groups.google.com/forum/#!topic/spark-users/oM-IzQs0Z2s>, in
> which Reynold is a bit optimistic about failures than I am comfortable
> with, but who am I to question Reynold?)
>

I'm wrong all the time so please do question me :)

One thing is that apps should be using something like an output committer
to enforce idempotency. Maybe that's some API we can provide in Spark
itself to make it easier to write applications.

Re: groupByKey() and keys with many values

Posted by Antonio Piccolboni <an...@piccolboni.info>.
You may also consider selecting distinct keys and fetching from database
first, then join on key with values. This in case Sean's approach is not
viable -- in case you need to have the DB data before the first reduce
call. By not revealing your problem, you are forcing us to make guesses,
which are less useful. Imagine you want to compute a binning of the values
on a per key basis. The bin definitions are in the database. Then the
reduce would be updating counts per bin.  You could let the reduce
initialize the bin counts from DB when empty. This will result in multiple
database accesses and connections per key, and the higher the degree of
parallelism, the bigger the cost (see this
<https://gist.github.com/tdhopper/0e5b53b5692f1e371534> elementary
example), which is something you should avoid if you want to write code
with some durability to it. If you use the join approach, you can select
the keys, unique them and perform data base access to obtain bin defs. Now
join the data file with the bin file on key. Then pass this through a
reduceByKey to update the bin counts. Different application, you want to
compute max min values per key and want to compare with previously recored
max min, then store the overall max min. Then you don't need the data based
values during the reduce. You just fetch them in the foreachPartition,
before each write.

As far as the DB writes,  remember spark can retry a computation, so your
writes have to be idempotent (see this thread
<https://groups.google.com/forum/#!topic/spark-users/oM-IzQs0Z2s>, in which
Reynold is a bit optimistic about failures than I am comfortable with, but
who am I to question Reynold?)






On Tue, Sep 8, 2015 at 12:53 AM Sean Owen <so...@cloudera.com> wrote:

> I think groupByKey is intended for cases where you do want the values
> in memory; for one-pass use cases, it's more efficient to use
> reduceByKey, or aggregateByKey if lower-level operations are needed.
>
> For your case, you probably want to do you reduceByKey, then perform
> the expensive per-key lookups once per key. You also probably want to
> do this in foreachPartition, not foreach, in order to pay DB
> connection costs just once per partition.
>
> On Tue, Sep 8, 2015 at 7:20 AM, kaklakariada <ch...@gmail.com>
> wrote:
> > Hi Antonio!
> >
> > Thank you very much for your answer!
> > You are right in that in my case the computation could be replaced by a
> > reduceByKey. The thing is that my computation also involves database
> > queries:
> >
> > 1. Fetch key-specific data from database into memory. This is expensive
> and
> > I only want to do this once for a key.
> > 2. Process each value using this data and update the common data
> > 3. Store modified data to database. Here it is important to write all
> data
> > for a key in one go.
> >
> > Is there a pattern how to implement something like this with reduceByKey?
> >
> > Out of curiosity: I understand why you want to discourage people from
> using
> > groupByKey. But is there a technical reason why the Iterable is
> implemented
> > the way it is?
> >
> > Kind regards,
> > Christoph.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985p13992.html
> > Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> > For additional commands, e-mail: dev-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: groupByKey() and keys with many values

Posted by Sean Owen <so...@cloudera.com>.
I think groupByKey is intended for cases where you do want the values
in memory; for one-pass use cases, it's more efficient to use
reduceByKey, or aggregateByKey if lower-level operations are needed.

For your case, you probably want to do you reduceByKey, then perform
the expensive per-key lookups once per key. You also probably want to
do this in foreachPartition, not foreach, in order to pay DB
connection costs just once per partition.

On Tue, Sep 8, 2015 at 7:20 AM, kaklakariada <ch...@gmail.com> wrote:
> Hi Antonio!
>
> Thank you very much for your answer!
> You are right in that in my case the computation could be replaced by a
> reduceByKey. The thing is that my computation also involves database
> queries:
>
> 1. Fetch key-specific data from database into memory. This is expensive and
> I only want to do this once for a key.
> 2. Process each value using this data and update the common data
> 3. Store modified data to database. Here it is important to write all data
> for a key in one go.
>
> Is there a pattern how to implement something like this with reduceByKey?
>
> Out of curiosity: I understand why you want to discourage people from using
> groupByKey. But is there a technical reason why the Iterable is implemented
> the way it is?
>
> Kind regards,
> Christoph.
>
>
>
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985p13992.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: groupByKey() and keys with many values

Posted by kaklakariada <ch...@gmail.com>.
Hi Antonio!

Thank you very much for your answer!
You are right in that in my case the computation could be replaced by a
reduceByKey. The thing is that my computation also involves database
queries:

1. Fetch key-specific data from database into memory. This is expensive and
I only want to do this once for a key.
2. Process each value using this data and update the common data
3. Store modified data to database. Here it is important to write all data
for a key in one go.

Is there a pattern how to implement something like this with reduceByKey?

Out of curiosity: I understand why you want to discourage people from using
groupByKey. But is there a technical reason why the Iterable is implemented
the way it is?

Kind regards,
Christoph.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985p13992.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: groupByKey() and keys with many values

Posted by Antonio Piccolboni <an...@piccolboni.info>.
To expand on what Sean said, I would look into replacing groupByKey with
reduceByKey. Also take a look at this doc
<http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html>.
I happen to have designed a library that was subject to the same criticism
when compared to the java mapreduce API wrt the use of iterables, but
neither we nor the critics could ever find a natural example of a problem
when you can express a computation as a single pass through each group
while using a constant amount of memory  that could not be converted to
using a combiner (mapreduce jargon, called a reduce in Spark and most
functional circles). If  you found such an example, while an obstacle for
you,  it would be of some  interest to know what it is.


On Mon, Sep 7, 2015 at 1:31 AM Sean Owen <so...@cloudera.com> wrote:

> That's how it's intended to work; if it's a problem, you probably need
> to re-design your computation to not use groupByKey. Usually you can
> do so.
>
> On Mon, Sep 7, 2015 at 9:02 AM, kaklakariada <ch...@gmail.com>
> wrote:
> > Hi,
> >
> > I already posted this question on the users mailing list
> > (
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-groupByKey-with-many-values-per-key-td24538.html
> )
> > but did not get a reply. Maybe this is the correct forum to ask.
> >
> > My problem is, that doing groupByKey().mapToPair() loads all values for a
> > key into memory which is a problem when the values don't fit into memory.
> > This was not a problem with Hadoop map/reduce, as the Iterable passed to
> the
> > reducer read from disk.
> >
> > In Spark, the Iterable passed to mapToPair() is backed by a CompactBuffer
> > containing all values.
> >
> > Is it possible to change this behavior without modifying Spark, or is
> there
> > a plan to change this?
> >
> > Thank you very much for your help!
> > Christoph.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985.html
> > Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> > For additional commands, e-mail: dev-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: groupByKey() and keys with many values

Posted by Sean Owen <so...@cloudera.com>.
That's how it's intended to work; if it's a problem, you probably need
to re-design your computation to not use groupByKey. Usually you can
do so.

On Mon, Sep 7, 2015 at 9:02 AM, kaklakariada <ch...@gmail.com> wrote:
> Hi,
>
> I already posted this question on the users mailing list
> (http://apache-spark-user-list.1001560.n3.nabble.com/Using-groupByKey-with-many-values-per-key-td24538.html)
> but did not get a reply. Maybe this is the correct forum to ask.
>
> My problem is, that doing groupByKey().mapToPair() loads all values for a
> key into memory which is a problem when the values don't fit into memory.
> This was not a problem with Hadoop map/reduce, as the Iterable passed to the
> reducer read from disk.
>
> In Spark, the Iterable passed to mapToPair() is backed by a CompactBuffer
> containing all values.
>
> Is it possible to change this behavior without modifying Spark, or is there
> a plan to change this?
>
> Thank you very much for your help!
> Christoph.
>
>
>
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/groupByKey-and-keys-with-many-values-tp13985.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org