You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/05/07 08:17:42 UTC

incubator-systemml git commit: [SYSTEMML-1588] Fix parfor spark result merge w/ pending rdd operations

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 0f8b19703 -> 0b57898cd


[SYSTEMML-1588] Fix parfor spark result merge w/ pending rdd operations

This patch fixes issues of the recently reworked parfor spark result
merge. Due to stack overflow issues, the spark result merge was changed
from an rdd union tree over all results to an export of all results and
single rdd construction over these hdfs files. However, the export only
covered existing hdfs files or dirty in-memory matrices, but no pending
rdd operations. We now properly export the input matrices and guard
against eager cleanup by maintaining the lineage of input rdds which
still allows for lazy result merge evaluation. 

Note: The eager cleanup issues of temporary result files did not show up
before because result matrix objects of parfor remote spark are not 
flagged to exist on hdfs. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/0b57898c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/0b57898c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/0b57898c

Branch: refs/heads/master
Commit: 0b57898cddebe287350576948fdb1e13f9f3a813
Parents: 0f8b197
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat May 6 23:45:36 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun May 7 01:10:03 2017 -0700

----------------------------------------------------------------------
 .../parfor/ResultMergeRemoteSpark.java          | 23 ++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/0b57898c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index d783977..44b7dc2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
-
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.parser.Expression.DataType;
 import org.apache.sysml.parser.Expression.ValueType;
@@ -151,10 +151,12 @@ public class ResultMergeRemoteSpark extends ResultMerge
 			job.setInputFormat(ii.inputFormatClass);
 			Path[] paths = new Path[ inputs.length ];
 			for(int i=0; i<paths.length; i++) {
-				//ensure presence of hdfs if inputs come from memory
-				if( inputs[i].isDirty() )
-					inputs[i].exportData();
+				//ensure input exists on hdfs (e.g., if in-memory or RDD)
+				inputs[i].exportData();
 				paths[i] = new Path( inputs[i].getFileName() );
+				//update rdd handle to allow lazy evaluation by guarding 
+				//against cleanup of temporary result files
+				setRDDHandleForMerge(inputs[i], sec);
 			}
 			FileInputFormat.setInputPaths(job, paths);
 			
@@ -184,6 +186,8 @@ public class ResultMergeRemoteSpark extends ResultMerge
 		    
 			//Step 3: create output rdd handle w/ lineage
 			ret = new RDDObject(out, varname);
+			for(int i=0; i<paths.length; i++)
+				ret.addLineageChild(inputs[i].getRDDHandle());
 			if( withCompare )
 				ret.addLineageChild(compare.getRDDHandle());
 		}
@@ -208,4 +212,15 @@ public class ResultMergeRemoteSpark extends ResultMerge
 		
 		return ret; 	
 	}
+	
+	@SuppressWarnings("unchecked")
+	private void setRDDHandleForMerge(MatrixObject mo, SparkExecutionContext sec) {
+		InputInfo iinfo = InputInfo.BinaryBlockInputInfo;
+		JavaSparkContext sc = sec.getSparkContext();
+		JavaPairRDD<MatrixIndexes,MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes,MatrixBlock>) 
+			sc.hadoopFile( mo.getFileName(), iinfo.inputFormatClass, iinfo.inputKeyClass, iinfo.inputValueClass);
+		RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
+		rddhandle.setHDFSFile(true);
+		mo.setRDDHandle(rddhandle);
+	}
 }