You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/19 05:08:12 UTC

incubator-systemml git commit: [SYSTEMML-1543] Fix parfor spark result merge w/ many result files

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 2bf61b476 -> cd5499c54


[SYSTEMML-1543] Fix parfor spark result merge w/ many result files

So far, the parfor spark result merge essentially created RDDs for all
results files, concatenated them via union into one RDD and finally
executed the actual merge operation. For many parfor tasks, i.e., many
result files, this led to stack overflow errors because the lineage for
binary union operations was too large. We now construct a single job
configuration with all filenames and hence a single input RDD for result
merge which exhibits much better scalability. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/cd5499c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/cd5499c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/cd5499c5

Branch: refs/heads/master
Commit: cd5499c54895c8745884cd2c9d2476f3df46606a
Parents: 2bf61b4
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Apr 18 22:09:49 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Apr 18 22:09:49 2017 -0700

----------------------------------------------------------------------
 .../parfor/ResultMergeRemoteMR.java             |   3 +-
 .../parfor/ResultMergeRemoteSpark.java          | 108 ++++++++++---------
 2 files changed, 61 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cd5499c5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
index b285a13..98a063e 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
@@ -175,8 +175,7 @@ public class ResultMergeRemoteMR extends ResultMerge
 		String jobname = "ParFor-RMMR";
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		
-		JobConf job;
-		job = new JobConf( ResultMergeRemoteMR.class );
+		JobConf job = new JobConf( ResultMergeRemoteMR.class );
 		job.setJobName(jobname+_pfid);
 
 		//maintain dml script counters

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/cd5499c5/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
index 49293e4..d783977 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
@@ -20,6 +20,9 @@
 package org.apache.sysml.runtime.controlprogram.parfor;
 
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
 
 import org.apache.sysml.api.DMLScript;
@@ -30,6 +33,7 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
+import org.apache.sysml.runtime.instructions.spark.functions.CopyBlockPairFunction;
 import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixFormatMetaData;
@@ -127,73 +131,81 @@ public class ResultMergeRemoteSpark extends ResultMerge
 
 		RDDObject ret = null;
 		
-	    //determine degree of parallelism
+		//determine degree of parallelism
 		int numRed = (int)determineNumReducers(rlen, clen, brlen, bclen, _numReducers);
-	
+		
 		//sanity check for empty src files
 		if( inputs == null || inputs.length==0  )
 			throw new DMLRuntimeException("Execute merge should never be called with no inputs.");
 		
 		try
 		{
-		    //Step 1: union over all results
-		    JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-		    		sec.getRDDHandleForMatrixObject(_inputs[0], InputInfo.BinaryBlockInputInfo);
-		    for( int i=1; i<_inputs.length; i++ ) {
-			    JavaPairRDD<MatrixIndexes, MatrixBlock> rdd2 = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-			    		sec.getRDDHandleForMatrixObject(_inputs[i], InputInfo.BinaryBlockInputInfo);
-			    rdd = rdd.union(rdd2);
-		    }
-		
-		    //Step 2a: merge with compare
-		    JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
-		    if( withCompare )
-		    {
-		    	JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-			    		sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
-			    
-		    	//merge values which differ from compare values
-		    	ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare();
-		    	out = rdd.groupByKey(numRed) //group all result blocks per key
-		    	         .join(compareRdd)   //join compare block and result blocks 
-		    	         .mapToPair(cfun);   //merge result blocks w/ compare
-		    }
-		    //Step 2b: merge without compare
-		    else
-		    {
-		    	//direct merge in any order (disjointness guaranteed)
-		    	out = RDDAggregateUtils.mergeByKey(rdd, false);
-		    }
+			//note: initial implementation via union over all result rdds discarded due to 
+			//stack overflow errors with many parfor tasks, and thus many rdds
+			
+			//Step 1: construct input rdd from all result files of parfor workers
+			//a) construct job conf with all files
+			InputInfo ii = InputInfo.BinaryBlockInputInfo;
+			JobConf job = new JobConf( ResultMergeRemoteMR.class );
+			job.setJobName(jobname);
+			job.setInputFormat(ii.inputFormatClass);
+			Path[] paths = new Path[ inputs.length ];
+			for(int i=0; i<paths.length; i++) {
+				//ensure presence of hdfs if inputs come from memory
+				if( inputs[i].isDirty() )
+					inputs[i].exportData();
+				paths[i] = new Path( inputs[i].getFileName() );
+			}
+			FileInputFormat.setInputPaths(job, paths);
+			
+			//b) create rdd from input files w/ deep copy of keys and blocks
+			JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = sec.getSparkContext()
+					.hadoopRDD(job, ii.inputFormatClass, ii.inputKeyClass, ii.inputValueClass)
+					.mapPartitionsToPair(new CopyBlockPairFunction(true), true);
+			
+			//Step 2a: merge with compare
+			JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
+			if( withCompare )
+			{
+				JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
+						sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
+		    	
+				//merge values which differ from compare values
+				ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare();
+				out = rdd.groupByKey(numRed) //group all result blocks per key
+		    			.join(compareRdd)   //join compare block and result blocks 
+		    			.mapToPair(cfun);   //merge result blocks w/ compare
+			}
+			//Step 2b: merge without compare
+			else {
+				//direct merge in any order (disjointness guaranteed)
+				out = RDDAggregateUtils.mergeByKey(rdd, false);
+			}
 		    
-		    //Step 3: create output rdd handle w/ lineage
-		    ret = new RDDObject(out, varname);
-		    for( int i=0; i<_inputs.length; i++ ) {
-		    	//child rdd handles guaranteed to exist
-		    	RDDObject child = _inputs[i].getRDDHandle();
-				ret.addLineageChild(child);
-		    }
+			//Step 3: create output rdd handle w/ lineage
+			ret = new RDDObject(out, varname);
+			if( withCompare )
+				ret.addLineageChild(compare.getRDDHandle());
 		}
-		catch( Exception ex )
-		{
+		catch( Exception ex ) {
 			throw new DMLRuntimeException(ex);
 		}	    
 		
 		//maintain statistics
-	    Statistics.incrementNoOfCompiledSPInst();
-	    Statistics.incrementNoOfExecutedSPInst();
-	    if( DMLScript.STATISTICS ){
+		Statistics.incrementNoOfCompiledSPInst();
+		Statistics.incrementNoOfExecutedSPInst();
+		if( DMLScript.STATISTICS ){
 			Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
 		}
-	    
+		
 		return ret;
 	}
 
-	private int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed)
-	{
+	private int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed) {
 		//set the number of mappers and reducers 
-	    long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1);
+		long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1);
 		int ret = (int)Math.min( numRed, reducerGroups );
-	    
-	    return ret; 	
+		
+		return ret; 	
 	}
 }