You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/05/07 08:17:42 UTC
incubator-systemml git commit: [SYSTEMML-1588] Fix parfor spark
result merge w/ pending rdd operations
Repository: incubator-systemml
Updated Branches:
refs/heads/master 0f8b19703 -> 0b57898cd
[SYSTEMML-1588] Fix parfor spark result merge w/ pending rdd operations
This patch fixes issues of the recently reworked parfor spark result
merge. Due to stack overflow issues, the spark result merge was changed
from an rdd union tree over all results to an export of all results and
single rdd construction over these hdfs files. However, the export only
covered existing hdfs files or dirty in-memory matrices, but no pending
rdd operations. We now properly export the input matrices and guard
against eager cleanup by maintaining the lineage of input rdds which
still allows for lazy result merge evaluation.
Note: The eager cleanup issues of temporary result files did not show up
before because result matrix objects of parfor remote spark are not
flagged to exist on hdfs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0b57898c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0b57898c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0b57898c
Branch: refs/heads/master
Commit: 0b57898cddebe287350576948fdb1e13f9f3a813
Parents: 0f8b197
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat May 6 23:45:36 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun May 7 01:10:03 2017 -0700
----------------------------------------------------------------------
.../parfor/ResultMergeRemoteSpark.java | 23 ++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0b57898c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index d783977..44b7dc2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
-
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.parser.Expression.DataType;
import org.apache.sysml.parser.Expression.ValueType;
@@ -151,10 +151,12 @@ public class ResultMergeRemoteSpark extends ResultMerge
job.setInputFormat(ii.inputFormatClass);
Path[] paths = new Path[ inputs.length ];
for(int i=0; i<paths.length; i++) {
- //ensure presence of hdfs if inputs come from memory
- if( inputs[i].isDirty() )
- inputs[i].exportData();
+ //ensure input exists on hdfs (e.g., if in-memory or RDD)
+ inputs[i].exportData();
paths[i] = new Path( inputs[i].getFileName() );
+ //update rdd handle to allow lazy evaluation by guarding
+ //against cleanup of temporary result files
+ setRDDHandleForMerge(inputs[i], sec);
}
FileInputFormat.setInputPaths(job, paths);
@@ -184,6 +186,8 @@ public class ResultMergeRemoteSpark extends ResultMerge
//Step 3: create output rdd handle w/ lineage
ret = new RDDObject(out, varname);
+ for(int i=0; i<paths.length; i++)
+ ret.addLineageChild(inputs[i].getRDDHandle());
if( withCompare )
ret.addLineageChild(compare.getRDDHandle());
}
@@ -208,4 +212,15 @@ public class ResultMergeRemoteSpark extends ResultMerge
return ret;
}
+
+ @SuppressWarnings("unchecked")
+ private void setRDDHandleForMerge(MatrixObject mo, SparkExecutionContext sec) {
+ InputInfo iinfo = InputInfo.BinaryBlockInputInfo;
+ JavaSparkContext sc = sec.getSparkContext();
+ JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
+ sc.hadoopFile( mo.getFileName(), iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
+ RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
+ rddhandle.setHDFSFile(true);
+ mo.setRDDHandle(rddhandle);
+ }
}