You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sddyljsx <gi...@git.apache.org> on 2018/08/02 10:28:27 UTC

[GitHub] spark issue #21859: [SPARK-24900][SQL]speed up sort when the dataset is smal...

Github user sddyljsx commented on the issue:

    https://github.com/apache/spark/pull/21859
  
    @felixcheung 
    Thanks for review.
    
    **1. How small is 'small':**
    
      This optimazition works when the sampled data of the RangePartitioner covers all the data to sort.
    
      The  size of the sampled data is :
    
    ```
    // Cast to double to avoid overflowing ints or longs
    val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
    // Assume the input partitions are roughly balanced and over-sample a little bit.
    val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
    ```
    
    ```
    val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
      .doc("The default number of partitions to use when shuffling data for joins or aggregations.")
      .intConf
      .createWithDefault(200)
    ```
    
    ```
    val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION =
        buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition")
          .internal()
          .doc("Number of points to sample per partition in order to determine the range boundaries" +
              " for range partitioning, typically used in global sorting (without limit).")
          .intConf
          .createWithDefault(100)
    ```
       
    The default value of the SHUFFLE_PARTITIONS is 200, and the default value of the RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION is 100, so the default size of the total sampled data is  200x100x3 = 60,000.
        
    **So when the size of the data to sort  in each partition is less than 60,000/partitions.length, this optimization works well.**
    
      In my case, 'select * from order where order_status = 9 order by order_id',  the original dataset has 10 partitions and the total size  is  20,808,930 ,  but after filtering  'order_status = 9', the total remaining size is only 18,610 (1,861 on each partition). The size of the sampled data in each partition is 60,000 / 10 = 6,000. 6,000 is larger than 1,861, so the sampled data will cover all the data to sort.
    
      The original logic will  execute the filescan twice, the first for calculating the RangePartitioner's rangeBounds  info by sampling the data, and the second for  getting the whole data, But in this case, we have got the whole data in the first filescan, so **there is no need to do the second one**. This is the purpose of optimization.
    
    **2. Tests**
    
       It can be tested  by executing sql like 'select * from a order by a.b' on a dataset with a small size.
    
       Unit Test has been added in SQLQuerySuite.
    
    **3. Benchmark**
    
       This optimazition works well when the original dataset is large, but the data to sort is small after filtering. Compared to the original logic, It will save the time on the second filescan and filter theoretically.
    
       Benchmark has been added in SmallDataSortBenchmark.
    
       The json dataset has 100,000,000 rows, the schema is (key, value). The key is in range (0, 100,000), each key has values (0, 1000), so the sql 'select * from src where key = $key order by value' will get 1000 rows.
    
       before optimization:
       
       ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6
    Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
    
    speed up sort when the dataset is small: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    sort                                      179881 / 182216          0.6        1798.8       1.0X
    
    ```
       after optimization:
    ```
    Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.13.6
    Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz
    
    speed up sort when the dataset is small: Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    ------------------------------------------------------------------------------------------------
    sort                                      127424 / 132435          0.8        1274.2       1.0X
    ```
    the sql speeds ​​up by 30% with this optimization.
    
       
    
       
    
       
    
       
    
    
    
     
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org