You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by atootoonchian <al...@levyx.com> on 2016/04/21 19:29:53 UTC

[Spark-SQL] Reduce Shuffle Data by pushing filter toward storage

SQL query planner can have intelligence to push down filter commands towards
the storage layer. If we optimize the query planner such that the IO to the
storage is reduced at the cost of running multiple filters (i.e., compute),
this should be desirable when the system is IO bound. An example to prove
the case in point is below from TPCH test bench:Let’s look at query q19 of
TPCH test bench.select    sum(l_extendedprice* (1 - l_discount)) as
revenuefrom lineitem, partwhere      ( p_partkey = l_partkey        and
p_brand = 'Brand#12'        and p_container in ('SM CASE', 'SM BOX', 'SM
PACK', 'SM PKG')        and l_quantity >= 1 and l_quantity <= 1 + 10       
and p_size between 1 and 5        and l_shipmode in ('AIR', 'AIR REG')       
and l_shipinstruct = 'DELIVER IN PERSON')      or      ( p_partkey =
l_partkey        and p_brand = 'Brand#23'        and p_container in ('MED
BAG', 'MED BOX', 'MED PKG', 'MED PACK')        and l_quantity >= 10 and
l_quantity <= 10 + 10        and p_size between 1 and 10        and
l_shipmode in ('AIR', 'AIR REG')        and l_shipinstruct = 'DELIVER IN
PERSON')      or      ( p_partkey = l_partkey        and p_brand =
'Brand#34'        and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG
PKG')        and l_quantity >= 20 and l_quantity <= 20 + 10        and
p_size between 1 and 15        and l_shipmode in ('AIR', 'AIR REG')       
and l_shipinstruct = 'DELIVER IN PERSON')Latest version of Spark creates a
following planner (not exactly, more readable planner) to execute
q19.Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))  Project
[l_extendedprice,l_discount]    Join Inner, Some(((p_partkey = l_partkey) &&
((((((   (p_brand = Brand#12) &&     p_container IN (SM CASE,SM BOX,SM
PACK,SM PKG)) &&    (l_quantity >= 1.0)) && (l_quantity <= 11.0)) &&   
(p_size <= 5)) || (((((p_brand = Brand#23) &&      p_container IN (MED
BAG,MED BOX,MED PKG,MED PACK)) &&     (l_quantity >= 10.0)) && (l_quantity
<= 20.0)) &&     (p_size <= 10))) || (((((p_brand = Brand#34) &&     
p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&     (l_quantity >= 20.0))
&& (l_quantity <= 30.0)) &&     (p_size <= 15)))))      Project [l_partkey,
l_quantity, l_extendedprice, l_discount]        Filter
((isnotnull(l_partkey) &&                 (isnotnull(l_shipinstruct) &&                
(l_shipmode IN (AIR,AIR REG) &&                 (l_shipinstruct = DELIVER IN
PERSON))))          LogicalRDD [l_orderkey, l_partkey, l_suppkey,
l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct,
l_shipmode, l_comment], MapPartitionsRDD[316]       Project [p_partkey,
p_brand, p_size, p_container]        Filter ((isnotnull(p_partkey) &&    
(isnotnull(p_size) &&     (cast(cast(p_size as decimal(20,0)) as int) >=
1)))          LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type,
p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314]    As
you see only three filter commands are pushed before join process is
executed.  l_shipmode IN (AIR,AIR REG)  l_shipinstruct = DELIVER IN PERSON 
(cast(cast(p_size as decimal(20,0)) as int) >= 1)
And the following filters are applied during the join process  p_brand =
Brand#12  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)   l_quantity >= 1.0
&& l_quantity <= 11.0   p_size <= 5    p_brand = Brand#23   p_container IN
(MED BAG,MED BOX,MED PKG,MED PACK)   l_quantity >= 10.0 && l_quantity <=
20.0   p_size <= 10   p_brand = Brand#34   p_container IN (LG CASE,LG BOX,LG
PACK,LG PKG)   l_quantity >= 20.0 && l_quantity <= 30.0  p_size <= 15Let’s
look at the following sequence of SQL commands which produce same result.val
partDfFilter = sqlContext.sql("""        |select p_brand, p_partkey from
part         |where        | (p_brand = 'Brand#12'        |   and
p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')        |   and
p_size between 1 and 5)        | or        | (p_brand = 'Brand#23'        |  
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')        |  
and p_size between 1 and 10)        | or        | (p_brand = 'Brand#34'       
|   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')        |  
and p_size between 1 and 15)       """.stripMargin)val itemLineDfFilter =
sqlContext.sql("""        |select         | l_partkey, l_quantity,
l_extendedprice, l_discount from lineitem        |where        | (l_quantity
>= 1 and l_quantity <= 30        |   and l_shipmode in ('AIR', 'AIR REG')       
|   and l_shipinstruct = 'DELIVER IN PERSON')     
""".stripMargin)partDfFilter.registerTempTable("partFilter")itemLineDfFilter.registerTempTable("lineitemFilter")var
q19Query = """                 |select        | sum(l_extendedprice* (1 -
l_discount)) as revenue        |from        | lineitemFilter,        |
partFilter        |where        | (p_partkey = l_partkey        |   and
p_brand = 'Brand#12'        |   and l_quantity >= 1 and l_quantity <= 1 +
10)        | or        | ( p_partkey = l_partkey        |   and p_brand =
'Brand#23'        |   and l_quantity >= 10 and l_quantity <= 10 + 10)       
| or        | ( p_partkey = l_partkey        |   and p_brand = 'Brand#34'       
|   and l_quantity >= 20 and l_quantity <= 20 + 10)      """.stripMarginAnd
as following planner shows how spark will execute new q19 query.Aggregate
[(sum(cast((l_extendedprice * (1.0 - l_discount))  Project
[l_extendedprice,l_discount]    Join Inner, Some(((p_partkey = l_partkey) &&
(((((p_brand = Brand#12) &&     (l_quantity >= 1.0)) && (l_quantity <=
11.0)) ||   (((p_brand = Brand#23) &&     (l_quantity >= 10.0)) &&
(l_quantity <= 20.0))) ||   (((p_brand = Brand#34) &&     (l_quantity >=
20.0)) && (l_quantity <= 30.0)))))      Project [l_partkey, l_quantity,
l_extendedprice, l_discount]        Filter ((isnotnull(l_partkey) &&               
((isnotnull(l_shipinstruct) &&                  isnotnull(l_quantity)) &&              
(((cast(l_quantity as float) >= 1.0) &&                 (cast(l_quantity as
float) <= 30.0)) &&                 (l_shipmode IN (AIR,AIR REG) &&                
(l_shipinstruct = DELIVER IN PERSON)))))          LogicalRDD [l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,
l_shipinstruct, l_shipmode, l_comment], MapPartitionsRDD[316]           
Project [p_partkey, p_brand, p_size, p_container]              Filter
((isnotnull(p_partkey) && 		isnotnull(cast(cast(p_partkey as decimal(20,0))
as int))) && (isnotnull(p_size) &&             ((cast(cast(p_size as
decimal(20,0)) as int) >= 1) &&            (((((p_brand = Brand#12) &&                 
p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) && 		     (cast(cast(p_size
as decimal(20,0)) as int) <= 5)) ||   (((p_brand = Brand#23) &&     
p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) &&     (cast(cast(p_size
as decimal(20,0)) as int) <= 10))) ||   (((p_brand = Brand#34) &&     
p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&     (cast(cast(p_size as
decimal(20,0)) as int) <= 15))))))                  LogicalRDD [p_partkey,
p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice,
p_comment], MapPartitionsRDD[314]With new approach all filter commands is
pushed down beyond join process   l_shipmode IN (AIR,AIR REG) 
l_shipinstruct = DELIVER IN PERSON   cast(cast(p_size as decimal(20,0)) as
int) >= 1)  p_brand = Brand#12  p_container IN (SM CASE,SM BOX,SM PACK,SM
PKG)  l_quantity >= 1.0 && l_quantity <= 11.0   p_size <= 5   p_brand =
Brand#23   p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)   l_quantity >=
10.0 && l_quantity <= 20.0   p_size <= 10   p_brand = Brand#34   p_container
IN (LG CASE,LG BOX,LG PACK,LG PKG)   l_quantity >= 20.0 && l_quantity <=
30.0  p_size <= 15But still some filter commands needs to be executed during
join process to distinguish different sets of items. In other words some
filter commands are re-evaluated.  p_brand = Brand#12  l_quantity >= 1.0 &&
l_quantity <= 11.0   p_brand = Brand#23   l_quantity >= 10.0 && l_quantity
<= 20.0   p_brand = Brand#34   l_quantity#807 >= 20.0 && l_quantity#807 <=
30.0Our main goal to push down filter as much as possible is to minimize I/O
and maximize processor utilization. So let’s compare result of original q19
and modified q19 from I/O point of
view.+--------+--------+---------------------------------------------+--------------------------------------------+|
TPCH   | Stage  |              Q19                                       |                         
Q19 modified              || Scale   |          
+----------+---------------+----------------+----------+----------------+---------------+|
Factor  |           | Input     | Shuffle Read  | Shuffle Write  | Input    
| Shuffle Read   | Shuffle Write
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
1         | 1         | 724 MB  |                    | 4.2 MB           |
724 MB  |                      | 2.7 MB        
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
1         | 2         | 23.0 MB |                    | 4.0 MB           |
23.0 MB |                      | 22.9 KB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
1         | 3         |              | 8.2 MB         | 11.0 KB         |             
| 2.7 MB           | 11.0 KB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
1         | 4         |              | 11.0 KB        |                    
|              | 11.0 KB          |                   
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
10       | 1         | 7.2 GB   |                    | 43.5 MB         | 7.2
GB   |                      | 28.0 MB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
10       | 2         | 232 MB  |                    | 39.1 MB         | 232
MB  |                      | 146.2 KB      
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
10       | 3         |              | 82.5 MB       | 11.0 KB          |             
| 28.1 MB         | 11.0 KB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
10       | 4         |              | 11.0 KB        |                     
|              | 11.0 KB         |                   
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
100     | 1         | 74.1 GB |                     | 448 MB          | 74.1
GB |                      | 266 MB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
100     | 2         | 2.3 GB   |                     | 385 MB          | 2.3
GB   |                     | 1570 KB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
100     | 3         |              | 834 MB         | 11.0 KB         |             
| 288 MB          | 11.0 KB       
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+|
100     | 4         |              | 11.0 KB        |                      |             
| 11.0 KB         |                   
|+--------+--------+----------+---------------+----------------+----------+----------------+---------------+As
rate of read and write amplification reduction for each scale factor is
shown in the following
table.+--------------------+--------------------------+------------------------------+--------+|
TPCH Scale Facto  | Q19 Shuffle Data        | Q19 Modified Shuffle Data  |
Rate   
|+--------------------+--------------------------+------------------------------+--------+|
1                         |  8.211 MB                   | 2.733 MB                        
|  3.00   |    
+--------------------+--------------------------+------------------------------+--------+|
10                       |  82.611 MB                 | 28.157 MB                      
|  2.93   |    
+--------------------+--------------------------+------------------------------+--------+|
100                     |  834.311 MB               | 288.081 MB                     
|  2.89   |    
+--------------------+--------------------------+------------------------------+--------+So
as you see shuffle read and write can be reduced by factor of 3 if we can
push more intelligent toward of storage.



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17296.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.