You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ali Tootoonchian <al...@levyx.com> on 2016/04/25 21:50:27 UTC

Cache Shuffle Based Operation Before Sort

Caching shuffle RDD before the sort process improves system performance. SQL
planner can be intelligent to cache join, aggregate or sort data frame
before executing next sort process.

For any sort process two job is created by spark, first one is responsible
for producing range boundary for shuffle partition and second one complete
sort process by creating a new shuffle RDD.

When an input of sort process is output of other shuffle process then reduce
part of shuffle RDD is re-evaluated and the intermediate  shuffle data is
read twice. If input shuffle RDD (exchange based data frame) is saved, sort
process can be completed faster. Remember that Spark saves RDD in parquet
format which usually compressed and its size is smaller than original data.

Let’s look at an example,
The following query is modified version of q3 of TPCH test bench.
tpchQuery = 
       """        
        |select *
        |from
        | customer,
        | orders,
        | lineitem
        |where
        | c_mktsegment = 'MACHINERY'
        | and c_custkey = o_custkey
        | and l_orderkey = o_orderkey
        | and o_orderdate < '1995-03-15'
        | and l_shipdate > '1995-03-15'
        |order by
        | o_orderdate
       """.stripMargin

The query can be executed in one step using current Spark SQL planner. The
other approach for execute this query is two steps. 
    Compute and cache output of join process
    Execute order by command
Following command show how second approach can be implemented

tpchQuery =
      """
        |select *
        |from
        | customer,
        | orders,
        | lineitem
        |where
        | c_mktsegment = 'MACHINERY'
        | and c_custkey = o_custkey
        | and l_orderkey = o_orderkey
        | and o_orderdate < '1995-03-15'
        | and l_shipdate > '1995-03-15'
      """.stripMargin
val joinDf = sqlContext.sql(tpchQuery).cache
val queryRes = joinDf.sort("o_orderdate")

Let’s look at details of execution for 10 and 100 scale factor input


By comparing stage 4, 9, 10 and 15, 20, 21 of two approaches, you can find
out that amount of data is read during sort process can be reduced by factor
2.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Cache-Shuffle-Based-Operation-Before-Sort-tp17331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Cache Shuffle Based Operation Before Sort

Posted by Takeshi Yamamuro <li...@gmail.com>.
Hi,

Look interesting.
This optimisation also seems effective in case of simply loading and
sorting df;
val df = sqlCtx.read.load(path)
df.cache.sort("some colum")

How big does this optimisation have effects on actual performance?
If big, it'd be better to open JIRA.

// maropu

On Mon, May 9, 2016 at 2:21 PM, Ted Yu <yu...@gmail.com> wrote:

> I assume there were supposed to be images following this line (which I
> don't see in the email thread):
>
> bq. Let’s look at details of execution for 10 and 100 scale factor input
>
> Consider using 3rd party image site.
>
> On Sun, May 8, 2016 at 5:17 PM, Ali Tootoonchian <al...@levyx.com> wrote:
>
>> Thanks for your comment.
>> Which image or chart are you pointing?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Cache-Shuffle-Based-Operation-Before-Sort-tp17331p17438.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
>> For additional commands, e-mail: dev-help@spark.apache.org
>>
>>
>


-- 
---
Takeshi Yamamuro

Re: Cache Shuffle Based Operation Before Sort

Posted by Ted Yu <yu...@gmail.com>.
I assume there were supposed to be images following this line (which I
don't see in the email thread):

bq. Let’s look at details of execution for 10 and 100 scale factor input

Consider using 3rd party image site.

On Sun, May 8, 2016 at 5:17 PM, Ali Tootoonchian <al...@levyx.com> wrote:

> Thanks for your comment.
> Which image or chart are you pointing?
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Cache-Shuffle-Based-Operation-Before-Sort-tp17331p17438.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

Re: Cache Shuffle Based Operation Before Sort

Posted by Ali Tootoonchian <al...@levyx.com>.
Thanks for your comment. 
Which image or chart are you pointing?



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Cache-Shuffle-Based-Operation-Before-Sort-tp17331p17438.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Cache Shuffle Based Operation Before Sort

Posted by Ted Yu <yu...@gmail.com>.
Interesting.

bq. details of execution for 10 and 100 scale factor input

Looks like some chart (or image) didn't go through.

FYI

On Mon, Apr 25, 2016 at 12:50 PM, Ali Tootoonchian <al...@levyx.com> wrote:

> Caching shuffle RDD before the sort process improves system performance.
> SQL
> planner can be intelligent to cache join, aggregate or sort data frame
> before executing next sort process.
>
> For any sort process two job is created by spark, first one is responsible
> for producing range boundary for shuffle partition and second one complete
> sort process by creating a new shuffle RDD.
>
> When an input of sort process is output of other shuffle process then
> reduce
> part of shuffle RDD is re-evaluated and the intermediate  shuffle data is
> read twice. If input shuffle RDD (exchange based data frame) is saved, sort
> process can be completed faster. Remember that Spark saves RDD in parquet
> format which usually compressed and its size is smaller than original data.
>
> Let’s look at an example,
> The following query is modified version of q3 of TPCH test bench.
> tpchQuery =
>        """
>         |select *
>         |from
>         | customer,
>         | orders,
>         | lineitem
>         |where
>         | c_mktsegment = 'MACHINERY'
>         | and c_custkey = o_custkey
>         | and l_orderkey = o_orderkey
>         | and o_orderdate < '1995-03-15'
>         | and l_shipdate > '1995-03-15'
>         |order by
>         | o_orderdate
>        """.stripMargin
>
> The query can be executed in one step using current Spark SQL planner. The
> other approach for execute this query is two steps.
>     Compute and cache output of join process
>     Execute order by command
> Following command show how second approach can be implemented
>
> tpchQuery =
>       """
>         |select *
>         |from
>         | customer,
>         | orders,
>         | lineitem
>         |where
>         | c_mktsegment = 'MACHINERY'
>         | and c_custkey = o_custkey
>         | and l_orderkey = o_orderkey
>         | and o_orderdate < '1995-03-15'
>         | and l_shipdate > '1995-03-15'
>       """.stripMargin
> val joinDf = sqlContext.sql(tpchQuery).cache
> val queryRes = joinDf.sort("o_orderdate")
>
> Let’s look at details of execution for 10 and 100 scale factor input
>
>
> By comparing stage 4, 9, 10 and 15, 20, 21 of two approaches, you can find
> out that amount of data is read during sort process can be reduced by
> factor
> 2.
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Cache-Shuffle-Based-Operation-Before-Sort-tp17331.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>