You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@systemml.apache.org by "Matthias Boehm (JIRA)" <ji...@apache.org> on 2017/03/10 08:20:04 UTC

[jira] [Comment Edited] (SYSTEMML-1378) Native dataset support in parfor spark datapartition-execute

    [ https://issues.apache.org/jira/browse/SYSTEMML-1378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904685#comment-15904685 ] 

Matthias Boehm edited comment on SYSTEMML-1378 at 3/10/17 8:19 AM:
-------------------------------------------------------------------

To answer your question regarding parfor task sizes: In this particular datapartition-execute job, the processing model is a single row/column (and once SYSTEMML-1310 is done, also single batches) at a time because the (potential) grouping via hash partitioning might destroy contiguous ranges and there is an explicit in-memory hand-over of partitions. However, in general (e.g., in the parfor spark execute job), we do support different "task partitioners" and the optimizer chooses according to the number of iterations and the body program. Right now we support the following task partitioners:
* Naive: parfor task = single iteration
* Fixed: parfor task = fixed number of iterations
* Static: parfor task = N/k iterations, where N is the total number of iterations and k is the degree of parallelism.
* Factoring: parfor task w/ waves of exponentially decreasing task sizes: E.g., N=101, k=4 -> (13,13,13,13, 7,7,7,7, 3,3,3,3, 2,2,2,2, 1) - this combines the advantages of static (small overhead, specifically log N tasks) and naive (good load balance via few small tasks at the end).
* Factoring max: see factoring but with max constraint to bound memory requirements. 


was (Author: mboehm7):
To answer your question regarding parfor task sizes: In this particular datapartition-execute job, the processing model is a single row/column (and once SYSTEMML-1310 is done, also single batches) at a time because the (potential) grouping via hash partitioning might destroy contiguous ranges and there is an explicit in-memory hand-over of partitions. However, in general (e.g., in the parfor spark execute job), we do support different "task partitioners" and the optimizer chooses according to the number of iterations and the body program. Right now we support the following task partitioners:
* Naive: parfor task = single iteration
* Fixed: parfor task = fixed number of iterations
* Static: parfor task = N/k iterations, where N is the total number of iterations and k is the degree of parallelism.
* Factoring: parfor task w/ waves of exponentially decreasing task sizes: E.g., N=101, k=4 -> (13,13,13,13, 7,7,7,7, 3,3,3,3, 2,2,2,2, 1) - this combines the advantages of static (small overhead, specifically log N tasks) and naive (good load balance via few small tasks at end to).
* Factoring max: see factoring but with max constraint to bound memory requirements. 

> Native dataset support in parfor spark datapartition-execute
> ------------------------------------------------------------
>
>                 Key: SYSTEMML-1378
>                 URL: https://issues.apache.org/jira/browse/SYSTEMML-1378
>             Project: SystemML
>          Issue Type: Sub-task
>          Components: APIs, Runtime
>            Reporter: Matthias Boehm
>            Assignee: Matthias Boehm
>             Fix For: SystemML 1.0
>
>
> This task aims for a deeper integration of Spark Datasets into SystemML. Consider the following example scenario, invoked through MLContext with X being a DataSet<Row>:
> {code}
> X = read(...)
> parfor( i in 1:nrow(X) ) {
>     Xi = X[i, ]
>     v[i, 1] = ... some computation over Xi
> }
> {code}
> Currently, we would convert the input dataset to binary block (1st shuffle) at API level and subsequently pass it into SystemML. For large data, we would then compile a single parfor data-partition execute job that slices row fragments, collects row fragments int partitions (2nd shuffle), and finally executes the parfor body per partition. 
> Native dataset support would allow us to avoid these two shuffles and compute the entire parfor in a data-local manner. In detail, this involves the following extensions:
> * API level: Keep lineage of input dataset leveraging our existing lineage mechanism in {{MatrixObject}}
> * Parfor datapartition-execute: SYSTEMML-1367 already introduced the data-local processing for special cases (if ncol<=blocksize). Given the lineage, we can simply probe the input to datapartition-execute and, for row partitioning, use directly the dataset instead of the reblocked matrix rdd in a data-local manner. This does not just avoid the 2nd shuffle but due to lazy evaluation also the 1st shuffle if no operation other than parfor accesses X (except zipwithindex if no ids are passed in, as this transformation triggers computation)
> * Cleanup: Prevent cleanup (unpersist) of lineage objects of type dataset as they are passed from outside.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)