You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ZunwenYou <gi...@git.apache.org> on 2017/02/20 14:47:05 UTC

[GitHub] spark pull request #17000: [SPARK-18946][ML] sliceAggregate which is a new a...

GitHub user ZunwenYou opened a pull request:

    https://github.com/apache/spark/pull/17000

    [SPARK-18946][ML] sliceAggregate which is a new aggregate operator for high-dimensional data

    In many machine learning cases, driver has to aggregate high-dimensional vectors/arrays from executors.
    TreeAggregate is good solution for aggregating vectors to driver, and you can increase depth of tree when data is large.
    However, treeAggregate would still failed, when the parition number of RDD and the dimension of vector grows up.
    
    We propose a new operator of RDD, named sliceAggregate, which split the vector into **_n_** slices and each slice is assigned a key(from 0 to **_n-1_**). The RDD[key, slice] will be transform to RDD[slice] by using reduceByKey operator.
    Finally driver will collect and compose the **_n_** slices to obtain result.
    
    ![qq 20170220214746](https://cloud.githubusercontent.com/assets/4936059/23129541/0f462474-f7be-11e6-997e-e494b98eee69.png)
    
    
    I run an experiment which calculate the statistic values of features.
    The number of samples is 1000. The feature dimension ranges from 10k to 20m, the comparition of time cost between treeAggregate and sliceAggregate is shown as follows. When feature dimension reach 20 million, treeAggregate was failed.
    
    The table of time cost(ms) between sliceAggregate and treeAggregate.
    
    | feature dimension |	sliceAggregate |	treeAggregate |
    | ---- | ---- |---- | 
    | 10K |	617 | 	607 |
    |100K |	1470  |     967  | 
    |1M    |    4019 |	4731 |
    |2.5M |	7679 |	13348 |
    |5M  |	14722 |	22858 |
    |7.5M |	20821 |	36472 |
    |10M |	28526 |	50184 |
    |20M | 	 47014 |  | |
     
    ![image](https://cloud.githubusercontent.com/assets/4936059/23129580/32e50260-f7be-11e6-9152-badc6423c356.png)
    
    The code relate to this experiment is [here](https://github.com/ZunwenYou/spark/blob/slice-aggregate-experiment/mllib/src/main/scala/org/apache/spark/ml/classification/SliceAggregate.scala) .

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ZunwenYou/spark slice-aggregate1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17000.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17000
    
----
commit 0e1ef6fdbc8e89c9eefecdc22d7957959117f747
Author: kevinzwyou <ke...@tencent.com>
Date:   2017-02-14T12:38:02Z

    slice aggregte

commit d480c4fc9d0f70e6ff93aeb691ce4afb3f28c4c4
Author: kevinzwyou <ke...@tencent.com>
Date:   2017-02-20T11:49:29Z

    remove sample files.

commit a86892b3cf05ba6959491fbd3409c978bb8afe8b
Author: kevinzwyou <ke...@tencent.com>
Date:   2017-02-20T11:51:41Z

    add an end line

commit f39cfad04cabd414bb48126fced9560f2e3883c0
Author: kevinzwyou <ke...@tencent.com>
Date:   2017-02-20T14:12:20Z

    fix an import style.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    @ZunwenYou yes I understand that the `sliceAggregate` is different from SPARK-19634 and more comparable to `treeAggregate`. But I'm not sure, if we plan to port the vector summary to use `DataFrame` based UDAF, whether we can incorporate the benefit of `sliceAggregate`.
    
    So my point would probably be to try to see how much benefit accrues from (a) using UDAF mechanism and (b) not computing unnecessary things. Then we can compare to the benefit here and decide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    I'm not totally certain there will be some huge benefit with porting vector summary to UDAF framework. But there are API-level benefits to doing so. Perhaps there is a way to incorporate the `sliceAggregate` idea into the summarizer or into catalyst operations that work with arrays...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    cc @yanboliang - it seems actually similar in effect to the VL-BFGS work with RDD-based coefficients?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by ZunwenYou <gi...@git.apache.org>.
Github user ZunwenYou commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Hi, @MLnick 
    Firstly, `sliceAggregate `is a common aggregate for array-like data. Besides `MultivariateOnlineSummarizer ` case, it can be used in many large machine learning cases. I chose `MultivariateOnlineSummarizer `to do our experiment, just because it is really a bottleneck of `LogisticRegression `in ml package.
    
    [This](https://issues.apache.org/jira/browse/SPARK-19634) is a good improvement for `MultivariateOnlineSummarizer`,  but I do not think it's a good idea to compare these two improvement. In my opinion, it is reasonable to compare `sliceAggregate `to `treeAggregate`.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by WeichenXu123 <gi...@git.apache.org>.
Github user WeichenXu123 commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    @MLnick It looks like VF-LBFGS has a different scenario. In VF algos, the vectors will be too large to store in driver memory, so we slice the vectors  into different machines (stored by `RDD[Vector], and the use partitionID as slice key).
    and , in VF-LBFGS, there're only very few large vectors(usually 4-10 vectors) need to aggregate together. so, what this PR do looks different with VF-LBFGS.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Is the speedup coming mostly from the `MultivariateOnlineSummarizer` stage?
    
    See https://issues.apache.org/jira/browse/SPARK-19634 which is for porting this operation to use DataFrame UDAF and for computing only the required metrics (instead of forcing computing all as is done currently). I wonder how that will compare?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by hhbyyh <gi...@git.apache.org>.
Github user hhbyyh commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Hi @ZunwenYou Do you know what's the reason that treeAggregate failed when  feature dimension reach 20 million? 
    I think this potentially can help with the 2G disk shuffle spill limit. (to be verified).
    Also we should evaluate the memory consumption due to the slice and copy.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by ZunwenYou <gi...@git.apache.org>.
Github user ZunwenYou commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    ping @yanboliang ,  please has a look at this improvement. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by ZunwenYou <gi...@git.apache.org>.
Github user ZunwenYou commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Hi, @MLnick 
    You are right, sliceAggregate splits an array into smaller chunks before shuffle.  
    It has three advantage
    Firstly, the shuffle data is less than treeAggregate during the whole transformation operation.
    Secondly, as your description, it allows more concurrency, not only during the collect operation of driver, but also in the process of run **_seqOp_** and **_combOp_**. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by ZunwenYou <gi...@git.apache.org>.
Github user ZunwenYou commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Hi, @MLnick
    You are right, sliceAggregate splits an array into smaller chunks before shuffle.
    It has three advantage
    Firstly, the shuffle data is less than treeAggregate during the whole transformation operation.
    Secondly, as your description, it allows more concurrency, not only during the collect operation of driver, but also in the process of run **_seqOp_** and **_combOp_**.
    Thirdly, as I observed, when an record is larger than 1G Bit(an array of 100 million dimension), shuffle among executors becomes less efficiency. At the same time, the rest of executos is waiting. I am not clear the reason for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by ZunwenYou <gi...@git.apache.org>.
Github user ZunwenYou commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Hi, @hhbyyh 
    
    In our experiment, the class **_MultivariateOnlineSummarizer_** contains 8 arrays, if the dimension reaches 20 million, the memory of MultivariateOnlineSummarizer is 1280M(8Bit* 20M * 8).
    
    The experiment configuration as follows:
    spark.driver.maxResultSize 6g
    spark.kryoserializer.buffer.max 2047m
    driver-memory 20g 
    num-executors 100 
    executor-cores 2 
    executor-memory 15g
    
    RDD and aggregate parameter:
    RDD partition number 300
    treeAggregate depth 5
    As the description of configuration, treeAggregate will run into four stages, each stage task number is 300, 75, 18, 4.
    At the last stage of treeAggrate, tasks will be killed, because executors throw exception _**java.lang.OutOfMemoryError: Requested array size exceeds VM limit**_. 
    I set treeAggregate depth=7, executor-memory=30g, the last stage still failed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17000: [SPARK-18946][ML] sliceAggregate which is a new aggregat...

Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on the issue:

    https://github.com/apache/spark/pull/17000
  
    Just to be clear - this is essentially just splitting an array up into smaller chunks so that overall communication is more efficient? It would be good to look at why Spark is not doing a good job with one big array. Is the bottleneck really the executor communication (shuffle part)? Or is it collecting the big array back at the end of tree aggregation (ie this patch sort of allows more concurrency in the `collect` operation)?
    
    cc @dbtsai @sethah @yanboliang  who were looking at linear model scalability recently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org