You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Joseph Bradley <jo...@databricks.com> on 2016/05/13 19:38:20 UTC

Re: Shrinking the DataFrame lineage

Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346

I don't have a great method currently, but hacks can get around it: convert
the DataFrame to an RDD and back to truncate the query plan lineage.

Joseph

On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander <
alexander.ulanov@hpe.com> wrote:

> Dear Spark developers,
>
>
>
> Recently, I was trying to switch my code from RDDs to DataFrames in order
> to compare the performance. The code computes RDD in a loop. I use
> RDD.persist followed by RDD.count to force Spark compute the RDD and cache
> it, so that it does not need to re-compute it on each iteration. However,
> it does not seem to work for DataFrame:
>
>
>
> import scala.util.Random
>
> val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5))
>
> val edges = sqlContext.createDataFrame(rdd).toDF("from", "to")
>
> val vertices =
> edges.select("from").unionAll(edges.select("to")).distinct().cache()
>
> vertices.count
>
> [Stage 34:=================>                                     (65 + 4)
> / 200]
>
> [Stage 34:========================>                              (90 + 5)
> / 200]
>
> [Stage 34:==============================>                       (114 + 4)
> / 200]
>
> [Stage 34:====================================>                 (137 + 4)
> / 200]
>
> [Stage 34:==========================================>           (157 + 4)
> / 200]
>
> [Stage 34:=================================================>    (182 + 4)
> / 200]
>
>
>
> res25: Long = 5
>
> If I run count again, it recomputes it again instead of using the cached
> result:
>
> scala> vertices.count
>
> [Stage 37:=============>                                         (49 + 4)
> / 200]
>
> [Stage 37:==================>                                    (66 + 4)
> / 200]
>
> [Stage 37:========================>                              (90 + 4)
> / 200]
>
> [Stage 37:=============================>                        (110 + 4)
> / 200]
>
> [Stage 37:===================================>                  (133 + 4)
> / 200]
>
> [Stage 37:==========================================>           (157 + 4)
> / 200]
>
> [Stage 37:================================================>     (178 + 5)
> / 200]
>
> res26: Long = 5
>
>
>
> Could you suggest how to schrink the DataFrame lineage ?
>
>
>
> Best regards, Alexander
>

Re: Shrinking the DataFrame lineage

Posted by Joseph Bradley <jo...@databricks.com>.
Sorry for the slow response.  I agree with Hamel on #1.
GraphFrames are mostly wrappers for GraphX algorithms.  There are a few
which are not:
* BFS: This is an iterative DataFrame alg.  Though it has unit tests, I
have not pushed it in scaling to see how far it can go.
* Belief Propagation example: This uses the conversion to and from an RDD.
Not great, but it's really just an example for now.

I definitely want to get this issue fixed ASAP!

On Sun, May 15, 2016 at 7:15 AM, Hamel Kothari <ha...@gmail.com>
wrote:

> I don't know about the second one but for question #1:
> When you convert from a cached DF to an RDD (via a map function or the
> "rdd" value) the types are converted from the off-heap types to on-heap
> types. If your rows are fairly large/complex this can have a pretty big
> performance impact so I would watch out for that.
>
> On Fri, May 13, 2016 at 5:29 PM Ulanov, Alexander <
> alexander.ulanov@hpe.com> wrote:
>
>> Hi Joseph,
>>
>>
>>
>> Thank you for the link! Two follow up questions
>>
>> 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types
>> and cached in off-heap store. It might be quite useful for iterative
>> workloads due to lower GC overhead. Then I convert it to RDD and then
>> backto DF. Will the resulting DF remain off-heap or it will be on heap as
>> regular RDD?
>>
>> 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to
>> use aggregateMessages in the iterative loop, for implementing PageRank.
>>
>>
>>
>> Best regards, Alexander
>>
>>
>>
>> *From:* Joseph Bradley [mailto:joseph@databricks.com]
>> *Sent:* Friday, May 13, 2016 12:38 PM
>> *To:* Ulanov, Alexander <al...@hpe.com>
>> *Cc:* dev@spark.apache.org
>> *Subject:* Re: Shrinking the DataFrame lineage
>>
>>
>>
>> Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346
>>
>>
>>
>> I don't have a great method currently, but hacks can get around it:
>> convert the DataFrame to an RDD and back to truncate the query plan lineage.
>>
>>
>>
>> Joseph
>>
>>
>>
>> On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander <
>> alexander.ulanov@hpe.com> wrote:
>>
>> Dear Spark developers,
>>
>>
>>
>> Recently, I was trying to switch my code from RDDs to DataFrames in order
>> to compare the performance. The code computes RDD in a loop. I use
>> RDD.persist followed by RDD.count to force Spark compute the RDD and cache
>> it, so that it does not need to re-compute it on each iteration. However,
>> it does not seem to work for DataFrame:
>>
>>
>>
>> import scala.util.Random
>>
>> val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5))
>>
>> val edges = sqlContext.createDataFrame(rdd).toDF("from", "to")
>>
>> val vertices =
>> edges.select("from").unionAll(edges.select("to")).distinct().cache()
>>
>> vertices.count
>>
>> [Stage 34:=================>                                     (65 + 4)
>> / 200]
>>
>> [Stage 34:========================>                              (90 + 5)
>> / 200]
>>
>> [Stage 34:==============================>                       (114 + 4)
>> / 200]
>>
>> [Stage 34:====================================>                 (137 + 4)
>> / 200]
>>
>> [Stage 34:==========================================>           (157 + 4)
>> / 200]
>>
>> [Stage 34:=================================================>    (182 + 4)
>> / 200]
>>
>>
>>
>> res25: Long = 5
>>
>> If I run count again, it recomputes it again instead of using the cached
>> result:
>>
>> scala> vertices.count
>>
>> [Stage 37:=============>                                         (49 + 4)
>> / 200]
>>
>> [Stage 37:==================>                                    (66 + 4)
>> / 200]
>>
>> [Stage 37:========================>                              (90 + 4)
>> / 200]
>>
>> [Stage 37:=============================>                        (110 + 4)
>> / 200]
>>
>> [Stage 37:===================================>                  (133 + 4)
>> / 200]
>>
>> [Stage 37:==========================================>           (157 + 4)
>> / 200]
>>
>> [Stage 37:================================================>     (178 + 5)
>> / 200]
>>
>> res26: Long = 5
>>
>>
>>
>> Could you suggest how to schrink the DataFrame lineage ?
>>
>>
>>
>> Best regards, Alexander
>>
>>
>>
>

Re: Shrinking the DataFrame lineage

Posted by Hamel Kothari <ha...@gmail.com>.
I don't know about the second one but for question #1:
When you convert from a cached DF to an RDD (via a map function or the
"rdd" value) the types are converted from the off-heap types to on-heap
types. If your rows are fairly large/complex this can have a pretty big
performance impact so I would watch out for that.

On Fri, May 13, 2016 at 5:29 PM Ulanov, Alexander <al...@hpe.com>
wrote:

> Hi Joseph,
>
>
>
> Thank you for the link! Two follow up questions
>
> 1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types
> and cached in off-heap store. It might be quite useful for iterative
> workloads due to lower GC overhead. Then I convert it to RDD and then
> backto DF. Will the resulting DF remain off-heap or it will be on heap as
> regular RDD?
>
> 2)How is the mentioned problem handled in GraphFrames? Suppose, I want to
> use aggregateMessages in the iterative loop, for implementing PageRank.
>
>
>
> Best regards, Alexander
>
>
>
> *From:* Joseph Bradley [mailto:joseph@databricks.com]
> *Sent:* Friday, May 13, 2016 12:38 PM
> *To:* Ulanov, Alexander <al...@hpe.com>
> *Cc:* dev@spark.apache.org
> *Subject:* Re: Shrinking the DataFrame lineage
>
>
>
> Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346
>
>
>
> I don't have a great method currently, but hacks can get around it:
> convert the DataFrame to an RDD and back to truncate the query plan lineage.
>
>
>
> Joseph
>
>
>
> On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander <
> alexander.ulanov@hpe.com> wrote:
>
> Dear Spark developers,
>
>
>
> Recently, I was trying to switch my code from RDDs to DataFrames in order
> to compare the performance. The code computes RDD in a loop. I use
> RDD.persist followed by RDD.count to force Spark compute the RDD and cache
> it, so that it does not need to re-compute it on each iteration. However,
> it does not seem to work for DataFrame:
>
>
>
> import scala.util.Random
>
> val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5))
>
> val edges = sqlContext.createDataFrame(rdd).toDF("from", "to")
>
> val vertices =
> edges.select("from").unionAll(edges.select("to")).distinct().cache()
>
> vertices.count
>
> [Stage 34:=================>                                     (65 + 4)
> / 200]
>
> [Stage 34:========================>                              (90 + 5)
> / 200]
>
> [Stage 34:==============================>                       (114 + 4)
> / 200]
>
> [Stage 34:====================================>                 (137 + 4)
> / 200]
>
> [Stage 34:==========================================>           (157 + 4)
> / 200]
>
> [Stage 34:=================================================>    (182 + 4)
> / 200]
>
>
>
> res25: Long = 5
>
> If I run count again, it recomputes it again instead of using the cached
> result:
>
> scala> vertices.count
>
> [Stage 37:=============>                                         (49 + 4)
> / 200]
>
> [Stage 37:==================>                                    (66 + 4)
> / 200]
>
> [Stage 37:========================>                              (90 + 4)
> / 200]
>
> [Stage 37:=============================>                        (110 + 4)
> / 200]
>
> [Stage 37:===================================>                  (133 + 4)
> / 200]
>
> [Stage 37:==========================================>           (157 + 4)
> / 200]
>
> [Stage 37:================================================>     (178 + 5)
> / 200]
>
> res26: Long = 5
>
>
>
> Could you suggest how to schrink the DataFrame lineage ?
>
>
>
> Best regards, Alexander
>
>
>

RE: Shrinking the DataFrame lineage

Posted by "Ulanov, Alexander" <al...@hpe.com>.
Hi Joseph,

Thank you for the link! Two follow up questions
1)Suppose I have the original DataFrame in Tungsen, i.e. catalyst types and cached in off-heap store. It might be quite useful for iterative workloads due to lower GC overhead. Then I convert it to RDD and then backto DF. Will the resulting DF remain off-heap or it will be on heap as regular RDD?
2)How is the mentioned problem handled in GraphFrames? Suppose, I want to use aggregateMessages in the iterative loop, for implementing PageRank.

Best regards, Alexander

From: Joseph Bradley [mailto:joseph@databricks.com]
Sent: Friday, May 13, 2016 12:38 PM
To: Ulanov, Alexander <al...@hpe.com>
Cc: dev@spark.apache.org
Subject: Re: Shrinking the DataFrame lineage

Here's a JIRA for it: https://issues.apache.org/jira/browse/SPARK-13346

I don't have a great method currently, but hacks can get around it: convert the DataFrame to an RDD and back to truncate the query plan lineage.

Joseph

On Wed, May 11, 2016 at 12:46 PM, Ulanov, Alexander <al...@hpe.com>> wrote:
Dear Spark developers,

Recently, I was trying to switch my code from RDDs to DataFrames in order to compare the performance. The code computes RDD in a loop. I use RDD.persist followed by RDD.count to force Spark compute the RDD and cache it, so that it does not need to re-compute it on each iteration. However, it does not seem to work for DataFrame:

import scala.util.Random
val rdd = sc.parallelize(1 to 10, 2).map(x => (Random(5), Random(5))
val edges = sqlContext.createDataFrame(rdd).toDF("from", "to")
val vertices = edges.select("from").unionAll(edges.select("to")).distinct().cache()
vertices.count
[Stage 34:=================>                                     (65 + 4) / 200]
[Stage 34:========================>                              (90 + 5) / 200]
[Stage 34:==============================>                       (114 + 4) / 200]
[Stage 34:====================================>                 (137 + 4) / 200]
[Stage 34:==========================================>           (157 + 4) / 200]
[Stage 34:=================================================>    (182 + 4) / 200]

res25: Long = 5
If I run count again, it recomputes it again instead of using the cached result:
scala> vertices.count
[Stage 37:=============>                                         (49 + 4) / 200]
[Stage 37:==================>                                    (66 + 4) / 200]
[Stage 37:========================>                              (90 + 4) / 200]
[Stage 37:=============================>                        (110 + 4) / 200]
[Stage 37:===================================>                  (133 + 4) / 200]
[Stage 37:==========================================>           (157 + 4) / 200]
[Stage 37:================================================>     (178 + 5) / 200]
res26: Long = 5

Could you suggest how to schrink the DataFrame lineage ?

Best regards, Alexander