You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "wangzhenhua (G)" <wa...@huawei.com> on 2017/01/18 08:53:52 UTC

答复: Limit Query Performance Suggestion

How about this:
1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new partitioner to uniformly dispatch the data

class LimitUniformPartitioner(partitions: Int) extends Partitioner {

  def numPartitions: Int = partitions
  
  var num = 0

  def getPartition(key: Any): Int = {
    num = num + 1
    num % partitions
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

2. then in GlobalLimit, we only take the first limit_number/num_of_shufflepartitions elements in each partition.

One issue left is how to decide shuffle partition number. 
We can have a config of the maximum number of elements for each GlobalLimit task to process,
then do a factorization to get a number most close to that config.
E.g. the config is 2000:
if limit=10000,  10000 = 2000 * 5, we shuffle to 5 partitions
if limit=9999, 9999 = 1111 * 9, we shuffle to 9 partitions
if limit is a prime number, we just fall back to single partition

best regards,
-zhenhua


-----邮件原件-----
发件人: Liang-Chi Hsieh [mailto:viirya@gmail.com] 
发送时间: 2017年1月18日 15:48
收件人: dev@spark.apache.org
主题: Re: Limit Query Performance Suggestion


Hi Sujith,

I saw your updated post. Seems it makes sense to me now.

If you use a very big limit number, the shuffling before `GlobalLimit` would be a bottleneck for performance, of course, even it can eventually shuffle enough data to the single partition.

Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must shuffle all limited data from all partitions to one single machine with respect to query execution. In other words, I think we can avoid shuffling data in `GlobalLimit`.

I have an idea to improve this and may update here later if I can make it work.


sujith71955 wrote
> Dear Liang,
> 
> Thanks for your valuable feedback.
> 
> There was a mistake in the previous post  i corrected it, as you 
> mentioned the  `GlobalLimit` we will only take the required number of 
> rows from the input iterator which really pulls data from local blocks 
> and remote blocks.
> but if the limit value is very high >= 10000000,  and when there will 
> be a shuffle exchange happens  between `GlobalLimit` and `LocalLimit` 
> to retrieve data from all partitions to one partition, since the limit 
> value is very large the performance bottleneck still exists.
>  
> soon in next  post i will publish a test report with sample data and 
> also figuring out a solution for this problem.
> 
> Please let me know for any clarifications or suggestions regarding 
> this issue.
> 
> Regards,
> Sujith





-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org


Re: 答复: Limit Query Performance Suggestion

Posted by Liang-Chi Hsieh <vi...@gmail.com>.
Hi zhenhua,

Thanks for the idea.

Actually, I think we can completely avoid shuffling the data in a limit
operation, no matter LocalLimit or GlobalLimit.



wangzhenhua (G) wrote
> How about this:
> 1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new
> partitioner to uniformly dispatch the data
> 
> class LimitUniformPartitioner(partitions: Int) extends Partitioner {
> 
>   def numPartitions: Int = partitions
>   
>   var num = 0
> 
>   def getPartition(key: Any): Int = {
>     num = num + 1
>     num % partitions
>   }
> 
>   override def equals(other: Any): Boolean = other match {
>     case h: HashPartitioner =>
>       h.numPartitions == numPartitions
>     case _ =>
>       false
>   }
> 
>   override def hashCode: Int = numPartitions
> }
> 
> 2. then in GlobalLimit, we only take the first
> limit_number/num_of_shufflepartitions elements in each partition.
> 
> One issue left is how to decide shuffle partition number. 
> We can have a config of the maximum number of elements for each
> GlobalLimit task to process,
> then do a factorization to get a number most close to that config.
> E.g. the config is 2000:
> if limit=10000,  10000 = 2000 * 5, we shuffle to 5 partitions
> if limit=9999, 9999 = 1111 * 9, we shuffle to 9 partitions
> if limit is a prime number, we just fall back to single partition
> 
> best regards,
> -zhenhua
> 
> 
> -----邮件原件-----
> 发件人: Liang-Chi Hsieh [mailto:

> viirya@

> ] 
> 发送时间: 2017年1月18日 15:48
> 收件人: 

> dev@.apache

> 主题: Re: Limit Query Performance Suggestion
> 
> 
> Hi Sujith,
> 
> I saw your updated post. Seems it makes sense to me now.
> 
> If you use a very big limit number, the shuffling before `GlobalLimit`
> would be a bottleneck for performance, of course, even it can eventually
> shuffle enough data to the single partition.
> 
> Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit`
> must shuffle all limited data from all partitions to one single machine
> with respect to query execution. In other words, I think we can avoid
> shuffling data in `GlobalLimit`.
> 
> I have an idea to improve this and may update here later if I can make it
> work.
> 
> 
> sujith71955 wrote
>> Dear Liang,
>> 
>> Thanks for your valuable feedback.
>> 
>> There was a mistake in the previous post  i corrected it, as you 
>> mentioned the  `GlobalLimit` we will only take the required number of 
>> rows from the input iterator which really pulls data from local blocks 
>> and remote blocks.
>> but if the limit value is very high >= 10000000,  and when there will 
>> be a shuffle exchange happens  between `GlobalLimit` and `LocalLimit` 
>> to retrieve data from all partitions to one partition, since the limit 
>> value is very large the performance bottleneck still exists.
>>  
>> soon in next  post i will publish a test report with sample data and 
>> also figuring out a solution for this problem.
>> 
>> Please let me know for any clarifications or suggestions regarding 
>> this issue.
>> 
>> Regards,
>> Sujith
> 
> 
> 
> 
> 
> -----
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

> 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: 

> dev-unsubscribe@.apache





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20657.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org