You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Swapnil Shinde <sw...@gmail.com> on 2017/02/08 17:02:41 UTC

Spark's execution plan debugging

Hello
       I am trying to figure out how spark generates its execution plan
with and without caching. I have this small example to illustrate what I am
doing-

val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2")
val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2")
val c = a.join(b, $"c1" === $"d1")
val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 =
x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2")
val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 =
x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2")
val x = d1.as("a").join(d2.as("b"), $"a.nC1" === $"b.nD1")

generic DAG for dataframe 'x' would be something like this- (Fig1)
        [image: Inline image 1]

Obviously, physical plan (x.explain) generates something like this - *(Without
any caching)*
[image: Inline image 2]

I am interpreting this as -
[image: Inline image 3]


As per my understanding, dataframe C is being used twice so it will be good
to cache to it. I am hoping if I cache 'c' then execution plan will look
like generic (explained above in fig1). However, I dont see it that way.
Correct me if my understanding is wrong in interpreting plan- *(Here c is
cached)*
[image: Inline image 4]

I don't think caching 'c' is helping anyway. Basically, input dataframes
'a' & 'b' are being fetched twice. (In this example a,b are dataframes
generated from local collection but real world has large files)

*Question:*
*    Why caching 'c' doesn't build physical plan where 'a' & 'b' were
fetched only once. Then 'c' is generated and then d1, d2 built in parallel
and provides input for x. (like fig1)*
* I understand I am missing something very basic in understanding execution
plans so please correct me if I am wrong anywhere.*


Thanks
Swapnil

Re: Spark's execution plan debugging

Posted by Swapnil Shinde <sw...@gmail.com>.
Thanks for your reply. I agree to your explanation of caching and seeing
that it's working as expected. I am running given snippet on spark 2.0.1
and even with caching, I can see it's going back to dataframes a & b.

On Thu, Feb 9, 2017 at 3:41 PM, Yong Zhang <ja...@hotmail.com> wrote:

> You may misunderstand what the cache mean. Caching a DF just means the
> data can be retrieved from the memory directly, instead of going to parent
> dependency to get the data. In your example, even the C is cached, but if
> you have 2 DFs derived out from it, then the DF of C will be scanned 2
> times in your application, but they are retrieved directly from the memory,
> instead of going to A/B DFs, which are the parent DFs that C is derived out
> from.
>
>
> In the Spark execution plan, it can find out if any DFs in the chain being
> cached or not, then generate the right execution plan accordingly, as shown
> in following example (Tested with Spark 1.6.3). So as you can see, if the C
> is NOT cached, then your X has to go to A/B (Scanning existing RDDs), but
> after C caches, Spark will get from "InMemoryColumnarTableScan". But
> cache has nothing to do how many times the data will be scanned or not.
>
>
> scala> x.explain
> == Physical Plan ==
> SortMergeJoin [c1#41], [c1#98]
> :- SortMergeJoin [c1#41], [d1#45]
> :  :- Sort [c1#41 ASC], false, 0
> :  :  +- TungstenExchange hashpartitioning(c1#41,200), None
> :  :     +- Project [_1#39 AS c1#41,_2#40 AS c2#42]
> :  :        +- Filter (_1#39 = a)
> :  :           +- Scan ExistingRDD[_1#39,_2#40]
> :  +- Sort [d1#45 ASC], false, 0
> :     +- TungstenExchange hashpartitioning(d1#45,200), None
> :        +- Project [_1#43 AS d1#45,_2#44 AS d2#46]
> :           +- Scan ExistingRDD[_1#43,_2#44]
> +- SortMergeJoin [c1#98], [d1#102]
>    :- Sort [c1#98 ASC], false, 0
>    :  +- TungstenExchange hashpartitioning(c1#98,200), None
>    :     +- Project [_1#39 AS c1#98,_2#40 AS c2#99]
>    :        +- Scan ExistingRDD[_1#39,_2#40]
>    +- Sort [d1#102 ASC], false, 0
>       +- TungstenExchange hashpartitioning(d1#102,200), None
>          +- Project [_1#43 AS d1#102,_2#44 AS d2#103]
>             +- Filter (_1#43 = b)
>                +- Scan ExistingRDD[_1#43,_2#44]
>
> scala> c.cache
> res17: c.type = [c1: string, c2: int, d1: string, d2: int]
>
> scala> x.explain
> == Physical Plan ==
> SortMergeJoin [c1#41], [c1#98]
> :- Filter (c1#41 = a)
> :  +- InMemoryColumnarTableScan [c1#41,c2#42,d1#45,d2#46], [(c1#41 = a)],
> InMemoryRelation [c1#41,c2#42,d1#45,d2#46], true, 10000, StorageLevel(true,
> true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None
> +- Sort [c1#98 ASC], false, 0
>    +- TungstenExchange hashpartitioning(c1#98,200), None
>       +- Filter (d1#102 = b)
>          +- InMemoryColumnarTableScan [c1#98,c2#99,d1#102,d2#103],
> [(d1#102 = b)], InMemoryRelation [c1#98,c2#99,d1#102,d2#103], true, 10000,
> StorageLevel(true, true, false, true, 1), SortMergeJoin [c1#41], [d1#45],
> None
>
>
>
>
> ------------------------------
> *From:* Swapnil Shinde <sw...@gmail.com>
> *Sent:* Thursday, February 9, 2017 2:53 PM
> *To:* user@spark.apache.org
> *Subject:* Re: Spark's execution plan debugging
>
> Any suggestions, please..
>
> On Wed, Feb 8, 2017 at 12:02 PM, Swapnil Shinde <sw...@gmail.com>
> wrote:
>
>> Hello
>>        I am trying to figure out how spark generates its execution plan
>> with and without caching. I have this small example to illustrate what I am
>> doing-
>>
>> val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2")
>> val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2")
>> val c = a.join(b, $"c1" === $"d1")
>> val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 =
>> x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2")
>> val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 =
>> x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2")
>> val x = d1.as("a").join(d2.as("b"), $"a.nC1" === $"b.nD1")
>>
>> generic DAG for dataframe 'x' would be something like this- (Fig1)
>>         [image: Inline image 1]
>>
>> Obviously, physical plan (x.explain) generates something like this - *(Without
>> any caching)*
>> [image: Inline image 2]
>>
>> I am interpreting this as -
>> [image: Inline image 3]
>>
>>
>> As per my understanding, dataframe C is being used twice so it will be
>> good to cache to it. I am hoping if I cache 'c' then execution plan will
>> look like generic (explained above in fig1). However, I dont see it that
>> way. Correct me if my understanding is wrong in interpreting plan- *(Here
>> c is cached)*
>> [image: Inline image 4]
>>
>> I don't think caching 'c' is helping anyway. Basically, input dataframes
>> 'a' & 'b' are being fetched twice. (In this example a,b are dataframes
>> generated from local collection but real world has large files)
>>
>> *Question:*
>> *    Why caching 'c' doesn't build physical plan where 'a' & 'b' were
>> fetched only once. Then 'c' is generated and then d1, d2 built in parallel
>> and provides input for x. (like fig1)*
>> * I understand I am missing something very basic in understanding
>> execution plans so please correct me if I am wrong anywhere.*
>>
>>
>> Thanks
>> Swapnil
>>
>>
>

Re: Spark's execution plan debugging

Posted by Yong Zhang <ja...@hotmail.com>.
You may misunderstand what the cache mean. Caching a DF just means the data can be retrieved from the memory directly, instead of going to parent dependency to get the data. In your example, even the C is cached, but if you have 2 DFs derived out from it, then the DF of C will be scanned 2 times in your application, but they are retrieved directly from the memory, instead of going to A/B DFs, which are the parent DFs that C is derived out from.


In the Spark execution plan, it can find out if any DFs in the chain being cached or not, then generate the right execution plan accordingly, as shown in following example (Tested with Spark 1.6.3). So as you can see, if the C is NOT cached, then your X has to go to A/B (Scanning existing RDDs), but after C caches, Spark will get from "InMemoryColumnarTableScan". But cache has nothing to do how many times the data will be scanned or not.


scala> x.explain
== Physical Plan ==
SortMergeJoin [c1#41], [c1#98]
:- SortMergeJoin [c1#41], [d1#45]
:  :- Sort [c1#41 ASC], false, 0
:  :  +- TungstenExchange hashpartitioning(c1#41,200), None
:  :     +- Project [_1#39 AS c1#41,_2#40 AS c2#42]
:  :        +- Filter (_1#39 = a)
:  :           +- Scan ExistingRDD[_1#39,_2#40]
:  +- Sort [d1#45 ASC], false, 0
:     +- TungstenExchange hashpartitioning(d1#45,200), None
:        +- Project [_1#43 AS d1#45,_2#44 AS d2#46]
:           +- Scan ExistingRDD[_1#43,_2#44]
+- SortMergeJoin [c1#98], [d1#102]
   :- Sort [c1#98 ASC], false, 0
   :  +- TungstenExchange hashpartitioning(c1#98,200), None
   :     +- Project [_1#39 AS c1#98,_2#40 AS c2#99]
   :        +- Scan ExistingRDD[_1#39,_2#40]
   +- Sort [d1#102 ASC], false, 0
      +- TungstenExchange hashpartitioning(d1#102,200), None
         +- Project [_1#43 AS d1#102,_2#44 AS d2#103]
            +- Filter (_1#43 = b)
               +- Scan ExistingRDD[_1#43,_2#44]

scala> c.cache
res17: c.type = [c1: string, c2: int, d1: string, d2: int]

scala> x.explain
== Physical Plan ==
SortMergeJoin [c1#41], [c1#98]
:- Filter (c1#41 = a)
:  +- InMemoryColumnarTableScan [c1#41,c2#42,d1#45,d2#46], [(c1#41 = a)], InMemoryRelation [c1#41,c2#42,d1#45,d2#46], true, 10000, StorageLevel(true, true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None
+- Sort [c1#98 ASC], false, 0
   +- TungstenExchange hashpartitioning(c1#98,200), None
      +- Filter (d1#102 = b)
         +- InMemoryColumnarTableScan [c1#98,c2#99,d1#102,d2#103], [(d1#102 = b)], InMemoryRelation [c1#98,c2#99,d1#102,d2#103], true, 10000, StorageLevel(true, true, false, true, 1), SortMergeJoin [c1#41], [d1#45], None




________________________________
From: Swapnil Shinde <sw...@gmail.com>
Sent: Thursday, February 9, 2017 2:53 PM
To: user@spark.apache.org
Subject: Re: Spark's execution plan debugging

Any suggestions, please..

On Wed, Feb 8, 2017 at 12:02 PM, Swapnil Shinde <sw...@gmail.com>> wrote:
Hello
       I am trying to figure out how spark generates its execution plan with and without caching. I have this small example to illustrate what I am doing-

val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2")
val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2")
val c = a.join(b, $"c1" === $"d1")
val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 = x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2")
val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 = x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2")
val x = d1.as<http://d1.as>("a").join(d2.as<http://d2.as>("b"), $"a.nC1" === $"b.nD1")

generic DAG for dataframe 'x' would be something like this- (Fig1)
        [Inline image 1]

Obviously, physical plan (x.explain) generates something like this - (Without any caching)
[Inline image 2]

I am interpreting this as -
[Inline image 3]


As per my understanding, dataframe C is being used twice so it will be good to cache to it. I am hoping if I cache 'c' then execution plan will look like generic (explained above in fig1). However, I dont see it that way. Correct me if my understanding is wrong in interpreting plan- (Here c is cached)
[Inline image 4]

I don't think caching 'c' is helping anyway. Basically, input dataframes 'a' & 'b' are being fetched twice. (In this example a,b are dataframes generated from local collection but real world has large files)

Question:
    Why caching 'c' doesn't build physical plan where 'a' & 'b' were fetched only once. Then 'c' is generated and then d1, d2 built in parallel and provides input for x. (like fig1)
 I understand I am missing something very basic in understanding execution plans so please correct me if I am wrong anywhere.


Thanks
Swapnil



Re: Spark's execution plan debugging

Posted by Swapnil Shinde <sw...@gmail.com>.
Any suggestions, please..

On Wed, Feb 8, 2017 at 12:02 PM, Swapnil Shinde <sw...@gmail.com>
wrote:

> Hello
>        I am trying to figure out how spark generates its execution plan
> with and without caching. I have this small example to illustrate what I am
> doing-
>
> val a = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("c1", "c2")
> val b = sc.parallelize(List(("a", 1), ("b", 2), ("c",3))).toDF("d1", "d2")
> val c = a.join(b, $"c1" === $"d1")
> val d1 = c.map(x => {val c1 = x.getAs[String]("c1"); val c2 =
> x.getAs[Int]("c2"); (c1, c2*2)}).toDF("nC1", "nC2")
> val d2 = c.map(x => {val d1 = x.getAs[String]("d1"); val d2 =
> x.getAs[Int]("d2"); (d1, d2*3)}).toDF("nD1", "nD2")
> val x = d1.as("a").join(d2.as("b"), $"a.nC1" === $"b.nD1")
>
> generic DAG for dataframe 'x' would be something like this- (Fig1)
>         [image: Inline image 1]
>
> Obviously, physical plan (x.explain) generates something like this - *(Without
> any caching)*
> [image: Inline image 2]
>
> I am interpreting this as -
> [image: Inline image 3]
>
>
> As per my understanding, dataframe C is being used twice so it will be
> good to cache to it. I am hoping if I cache 'c' then execution plan will
> look like generic (explained above in fig1). However, I dont see it that
> way. Correct me if my understanding is wrong in interpreting plan- *(Here
> c is cached)*
> [image: Inline image 4]
>
> I don't think caching 'c' is helping anyway. Basically, input dataframes
> 'a' & 'b' are being fetched twice. (In this example a,b are dataframes
> generated from local collection but real world has large files)
>
> *Question:*
> *    Why caching 'c' doesn't build physical plan where 'a' & 'b' were
> fetched only once. Then 'c' is generated and then d1, d2 built in parallel
> and provides input for x. (like fig1)*
> * I understand I am missing something very basic in understanding
> execution plans so please correct me if I am wrong anywhere.*
>
>
> Thanks
> Swapnil
>
>