You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nicholas Chammas <ni...@gmail.com> on 2018/05/01 03:17:08 UTC

Identifying specific persisted DataFrames via getPersistentRDDs()

This seems to be an underexposed part of the API. My use case is this: I
want to unpersist all DataFrames except a specific few. I want to do this
because I know at a specific point in my pipeline that I have a handful of
DataFrames that I need, and everything else is no longer needed.

The problem is that there doesn’t appear to be a way to identify specific
DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(),
which is the only way I’m aware of to ask Spark for all currently persisted
RDDs:

>>> a = spark.range(10).persist()>>> a.rdd.id()8>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
[(3, JavaObject id=o36)]

As you can see, the id of the persisted RDD, 8, doesn’t match the id
returned by getPersistentRDDs(), 3. So I can’t go through the RDDs returned
by getPersistentRDDs() and know which ones I want to keep.

id() itself appears to be an undocumented method of the RDD API, and in
PySpark getPersistentRDDs() is buried behind the Java sub-objects
<https://issues.apache.org/jira/browse/SPARK-2141>, so I know I’m reaching
here. But is there a way to do what I want in PySpark without manually
tracking everything I’ve persisted myself?

And more broadly speaking, do we want to add additional APIs, or formalize
currently undocumented APIs like id(), to make this use case possible?

Nick
​

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Posted by Nicholas Chammas <ni...@gmail.com>.
That’s correct. I probably would have done better to title this thread
something like “How to effectively track and release persisted DataFrames”.

I jumped the gun in my initial email by referencing getPersistentRDDs() as
a potential solution, but in theory the desired API is something like
spark.unpersistAllExcept([list
of DataFrames or RDDs]). This seems awkward, but I suspect the underlying
use case is common.

An alternative or complementary approach, perhaps, would be to allow
persistence (and perhaps even checkpointing) to be explicitly scoped
<https://issues.apache.org/jira/browse/SPARK-16921>. I think in some
circles this is called “Scope-based Resource Management” or “Resource
acquisition is initialization” (RAII). It would make it a lot easier to
track and release DataFrames or RDDs when they are no longer needed in
cache.

Nick

2018년 5월 8일 (화) 오후 1:32, Mark Hamstra <ma...@clearstorydata.com>님이 작성:

If I am understanding you correctly, you're just saying that the problem is
> that you know what you want to keep, not what you want to throw away, and
> that there is no unpersist DataFrames call based on that what-to-keep
> information.
>
> On Tue, May 8, 2018 at 6:00 AM, Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> I certainly can, but the problem I’m facing is that of how best to track
>> all the DataFrames I no longer want to persist.
>>
>> I create and persist various DataFrames throughout my pipeline. Spark is
>> already tracking all this for me, and exposing some of that tracking
>> information via getPersistentRDDs(). So when I arrive at a point in my
>> program where I know, “I only need this DataFrame going forward”, I want to
>> be able to tell Spark “Please unpersist everything except this one
>> DataFrame”. If I cannot leverage the information about persisted DataFrames
>> that Spark is already tracking, then the alternative is for me to carefully
>> track and unpersist DataFrames when I no longer need them.
>>
>> I suppose the problem is similar at a high level to garbage collection.
>> Tracking and freeing DataFrames manually is analogous to malloc and free;
>> and full automation would be Spark automatically unpersisting DataFrames
>> when they were no longer referenced or needed. I’m looking for an
>> in-between solution that lets me leverage some of the persistence tracking
>> in Spark so I don’t have to do it all myself.
>>
>> Does this make more sense now, from a use case perspective as well as
>> from a desired API perspective?
>> ​
>>
>> On Thu, May 3, 2018 at 10:26 PM Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Why do you need the underlying RDDs? Can't you just unpersist the
>>> dataframes that you don't need?
>>>
>>>
>>> On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> This seems to be an underexposed part of the API. My use case is this:
>>>> I want to unpersist all DataFrames except a specific few. I want to do this
>>>> because I know at a specific point in my pipeline that I have a handful of
>>>> DataFrames that I need, and everything else is no longer needed.
>>>>
>>>> The problem is that there doesn’t appear to be a way to identify
>>>> specific DataFrames (or rather, their underlying RDDs) via
>>>> getPersistentRDDs(), which is the only way I’m aware of to ask Spark
>>>> for all currently persisted RDDs:
>>>>
>>>> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
>>>> [(3, JavaObject id=o36)]
>>>>
>>>> As you can see, the id of the persisted RDD, 8, doesn’t match the id
>>>> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
>>>> returned by getPersistentRDDs() and know which ones I want to keep.
>>>>
>>>> id() itself appears to be an undocumented method of the RDD API, and
>>>> in PySpark getPersistentRDDs() is buried behind the Java sub-objects
>>>> <https://issues.apache.org/jira/browse/SPARK-2141>, so I know I’m
>>>> reaching here. But is there a way to do what I want in PySpark without
>>>> manually tracking everything I’ve persisted myself?
>>>>
>>>> And more broadly speaking, do we want to add additional APIs, or
>>>> formalize currently undocumented APIs like id(), to make this use case
>>>> possible?
>>>>
>>>> Nick
>>>> ​
>>>>
>>>
> ​

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Posted by Mark Hamstra <ma...@clearstorydata.com>.
If I am understanding you correctly, you're just saying that the problem is
that you know what you want to keep, not what you want to throw away, and
that there is no unpersist DataFrames call based on that what-to-keep
information.

On Tue, May 8, 2018 at 6:00 AM, Nicholas Chammas <nicholas.chammas@gmail.com
> wrote:

> I certainly can, but the problem I’m facing is that of how best to track
> all the DataFrames I no longer want to persist.
>
> I create and persist various DataFrames throughout my pipeline. Spark is
> already tracking all this for me, and exposing some of that tracking
> information via getPersistentRDDs(). So when I arrive at a point in my
> program where I know, “I only need this DataFrame going forward”, I want to
> be able to tell Spark “Please unpersist everything except this one
> DataFrame”. If I cannot leverage the information about persisted DataFrames
> that Spark is already tracking, then the alternative is for me to carefully
> track and unpersist DataFrames when I no longer need them.
>
> I suppose the problem is similar at a high level to garbage collection.
> Tracking and freeing DataFrames manually is analogous to malloc and free;
> and full automation would be Spark automatically unpersisting DataFrames
> when they were no longer referenced or needed. I’m looking for an
> in-between solution that lets me leverage some of the persistence tracking
> in Spark so I don’t have to do it all myself.
>
> Does this make more sense now, from a use case perspective as well as from
> a desired API perspective?
> ​
>
> On Thu, May 3, 2018 at 10:26 PM Reynold Xin <rx...@databricks.com> wrote:
>
>> Why do you need the underlying RDDs? Can't you just unpersist the
>> dataframes that you don't need?
>>
>>
>> On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <
>> nicholas.chammas@gmail.com> wrote:
>>
>>> This seems to be an underexposed part of the API. My use case is this: I
>>> want to unpersist all DataFrames except a specific few. I want to do this
>>> because I know at a specific point in my pipeline that I have a handful of
>>> DataFrames that I need, and everything else is no longer needed.
>>>
>>> The problem is that there doesn’t appear to be a way to identify
>>> specific DataFrames (or rather, their underlying RDDs) via
>>> getPersistentRDDs(), which is the only way I’m aware of to ask Spark
>>> for all currently persisted RDDs:
>>>
>>> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
>>> [(3, JavaObject id=o36)]
>>>
>>> As you can see, the id of the persisted RDD, 8, doesn’t match the id
>>> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
>>> returned by getPersistentRDDs() and know which ones I want to keep.
>>>
>>> id() itself appears to be an undocumented method of the RDD API, and in
>>> PySpark getPersistentRDDs() is buried behind the Java sub-objects
>>> <https://issues.apache.org/jira/browse/SPARK-2141>, so I know I’m
>>> reaching here. But is there a way to do what I want in PySpark without
>>> manually tracking everything I’ve persisted myself?
>>>
>>> And more broadly speaking, do we want to add additional APIs, or
>>> formalize currently undocumented APIs like id(), to make this use case
>>> possible?
>>>
>>> Nick
>>> ​
>>>
>>

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Posted by Nicholas Chammas <ni...@gmail.com>.
I certainly can, but the problem I’m facing is that of how best to track
all the DataFrames I no longer want to persist.

I create and persist various DataFrames throughout my pipeline. Spark is
already tracking all this for me, and exposing some of that tracking
information via getPersistentRDDs(). So when I arrive at a point in my
program where I know, “I only need this DataFrame going forward”, I want to
be able to tell Spark “Please unpersist everything except this one
DataFrame”. If I cannot leverage the information about persisted DataFrames
that Spark is already tracking, then the alternative is for me to carefully
track and unpersist DataFrames when I no longer need them.

I suppose the problem is similar at a high level to garbage collection.
Tracking and freeing DataFrames manually is analogous to malloc and free;
and full automation would be Spark automatically unpersisting DataFrames
when they were no longer referenced or needed. I’m looking for an
in-between solution that lets me leverage some of the persistence tracking
in Spark so I don’t have to do it all myself.

Does this make more sense now, from a use case perspective as well as from
a desired API perspective?
​

On Thu, May 3, 2018 at 10:26 PM Reynold Xin <rx...@databricks.com> wrote:

> Why do you need the underlying RDDs? Can't you just unpersist the
> dataframes that you don't need?
>
>
> On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <
> nicholas.chammas@gmail.com> wrote:
>
>> This seems to be an underexposed part of the API. My use case is this: I
>> want to unpersist all DataFrames except a specific few. I want to do this
>> because I know at a specific point in my pipeline that I have a handful of
>> DataFrames that I need, and everything else is no longer needed.
>>
>> The problem is that there doesn’t appear to be a way to identify specific
>> DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(),
>> which is the only way I’m aware of to ask Spark for all currently persisted
>> RDDs:
>>
>> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
>> [(3, JavaObject id=o36)]
>>
>> As you can see, the id of the persisted RDD, 8, doesn’t match the id
>> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
>> returned by getPersistentRDDs() and know which ones I want to keep.
>>
>> id() itself appears to be an undocumented method of the RDD API, and in
>> PySpark getPersistentRDDs() is buried behind the Java sub-objects
>> <https://issues.apache.org/jira/browse/SPARK-2141>, so I know I’m
>> reaching here. But is there a way to do what I want in PySpark without
>> manually tracking everything I’ve persisted myself?
>>
>> And more broadly speaking, do we want to add additional APIs, or
>> formalize currently undocumented APIs like id(), to make this use case
>> possible?
>>
>> Nick
>> ​
>>
>

Re: Identifying specific persisted DataFrames via getPersistentRDDs()

Posted by Reynold Xin <rx...@databricks.com>.
Why do you need the underlying RDDs? Can't you just unpersist the
dataframes that you don't need?


On Mon, Apr 30, 2018 at 8:17 PM Nicholas Chammas <ni...@gmail.com>
wrote:

> This seems to be an underexposed part of the API. My use case is this: I
> want to unpersist all DataFrames except a specific few. I want to do this
> because I know at a specific point in my pipeline that I have a handful of
> DataFrames that I need, and everything else is no longer needed.
>
> The problem is that there doesn’t appear to be a way to identify specific
> DataFrames (or rather, their underlying RDDs) via getPersistentRDDs(),
> which is the only way I’m aware of to ask Spark for all currently persisted
> RDDs:
>
> >>> a = spark.range(10).persist()>>> a.rdd.id()8>>> list(spark.sparkContext._jsc.getPersistentRDDs().items())
> [(3, JavaObject id=o36)]
>
> As you can see, the id of the persisted RDD, 8, doesn’t match the id
> returned by getPersistentRDDs(), 3. So I can’t go through the RDDs
> returned by getPersistentRDDs() and know which ones I want to keep.
>
> id() itself appears to be an undocumented method of the RDD API, and in
> PySpark getPersistentRDDs() is buried behind the Java sub-objects
> <https://issues.apache.org/jira/browse/SPARK-2141>, so I know I’m
> reaching here. But is there a way to do what I want in PySpark without
> manually tracking everything I’ve persisted myself?
>
> And more broadly speaking, do we want to add additional APIs, or formalize
> currently undocumented APIs like id(), to make this use case possible?
>
> Nick
> ​
>