You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/25 19:50:49 UTC

[1/4] incubator-systemml git commit: [SYSTEMML-1350] Performance parfor spark datapartition-execute jobs

Repository: incubator-systemml
Updated Branches:
  refs/heads/master b6a46500d -> b78c12593


[SYSTEMML-1350] Performance parfor spark datapartition-execute jobs

This patch makes the following performance and robustness improvements
to the parfor spark datapartition-execute and datapartition jobs:

(1) Data-size-dependent number of reduce tasks. So far, we used at max
the number of cores to achieve best pre-aggregation of results. However,
on spark, a too small number of reduce tasks (and hence too large reduce
partitions) can lead to 2GB limit issues and OOMs due to increase memory
pressure. We now determine the number of reduce tasks in a more
considerate way.

(2) Reuse of matrix block partitions. For dense matrix block partitions,
we now reuse already allocated partitions in order to reduce GC
overheads.

(3) Incremental nnz maintenance. Finally, we now also incrementally
maintain the nnz during partition collect based on the block meta data
instead of recomputing it, which avoids an unnecessary scan per
partition.  

Together, these changes improved the runtime of perftest 80GB
univariate/bivariate from 379s/466s to 337s/376s on a small 1+5 node
cluster, while ensuring also much better robustness for larger datasets. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/baa70a15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/baa70a15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/baa70a15

Branch: refs/heads/master
Commit: baa70a150b7e01f81f72ac1ce0ca07fab9a18265
Parents: b6a4650
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 15:02:53 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:02 2017 -0800

----------------------------------------------------------------------
 .../parfor/DataPartitionerRemoteSpark.java      |  36 ++++---
 .../DataPartitionerRemoteSparkReducer.java      |  20 ++--
 .../parfor/RemoteDPParForSpark.java             |  28 +++--
 .../parfor/RemoteDPParForSparkWorker.java       | 107 ++++++++-----------
 4 files changed, 89 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
index 41fb235..be758d2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSpark.java
@@ -27,6 +27,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -66,26 +67,24 @@ public class DataPartitionerRemoteSpark extends DataPartitioner
 
 		try
 		{
-		    //cleanup existing output files
-		    MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-	
-		    //determine degree of parallelism
-			int numRed = (int)determineNumReducers(rlen, clen, brlen, bclen, _numRed);
-	
+			//cleanup existing output files
+			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
 			//get input rdd
 			JavaPairRDD<MatrixIndexes, MatrixBlock> inRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
 					sec.getRDDHandleForMatrixObject(in, InputInfo.BinaryBlockInputInfo);
-			MatrixCharacteristics mc = in.getMatrixCharacteristics();
 			
+			//determine degree of parallelism
+			MatrixCharacteristics mc = in.getMatrixCharacteristics();
+			int numRed = (int)determineNumReducers(inRdd, mc, _numRed);
+	
 			//run spark remote data partition job 
 			DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, _format);
 			DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, oi);
 			inRdd.flatMapToPair(dpfun) //partition the input blocks
 			     .groupByKey(numRed)   //group partition blocks 		          
-			     .foreach( wfun );     //write partitions to hdfs 
+			     .foreach(wfun);       //write partitions to hdfs 
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 		
@@ -97,12 +96,17 @@ public class DataPartitionerRemoteSpark extends DataPartitioner
 		}
 	}
 
-	private long determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed)
+	private long determineNumReducers(JavaPairRDD<MatrixIndexes,MatrixBlock> in,
+		MatrixCharacteristics mc, long numRed)
 	{
-		//set the number of mappers and reducers 
+		long rlen = mc.getRows();
+		long clen = mc.getCols();
+		int brlen = mc.getRowsPerBlock();
+		int bclen = mc.getColsPerBlock();
+		
+		//determine number of reducer groups 
 	    long reducerGroups = -1;
-	    switch( _format )
-	    {
+	    switch( _format ) {
 		    case ROW_WISE: reducerGroups = rlen; break;
 		    case COLUMN_WISE: reducerGroups = clen; break;
 		    case ROW_BLOCK_WISE: reducerGroups = (rlen/brlen)+((rlen%brlen==0)?0:1); break;
@@ -113,6 +117,8 @@ public class DataPartitionerRemoteSpark extends DataPartitioner
 				//do nothing
 	    }
 	    
-	    return (int)Math.min( numRed, reducerGroups); 	
+	  //compute number of reducers (to avoid OOMs and reduce memory pressure)
+	  int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
+	  return Math.max(numRed, Math.min(numParts, reducerGroups));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
index 8caac98..d8bb04d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/DataPartitionerRemoteSparkReducer.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.spark.api.java.function.VoidFunction;
 
 import org.apache.sysml.conf.ConfigurationManager;
-import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
+import org.apache.sysml.runtime.io.IOUtilFunctions;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysml.runtime.matrix.data.OutputInfo;
@@ -40,16 +40,12 @@ import scala.Tuple2;
 
 public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Long, Iterable<Writable>>> 
 {
-	
 	private static final long serialVersionUID = -7149865018683261964L;
 	
 	private String _fnameNew = null;
 	
-	public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo oi) 
-		throws DMLRuntimeException
-	{
+	public DataPartitionerRemoteSparkReducer(String fnameNew, OutputInfo oi) {
 		_fnameNew = fnameNew;
-		//_oi = oi;
 	}
 
 	@Override
@@ -69,17 +65,13 @@ public class DataPartitionerRemoteSparkReducer implements VoidFunction<Tuple2<Lo
 			FileSystem fs = FileSystem.get(job);
 			Path path = new Path(_fnameNew + File.separator + key);
 			writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class);
-			while( valueList.hasNext() )
-			{
+			while( valueList.hasNext() ) {
 				PairWritableBlock pair = (PairWritableBlock) valueList.next();
 				writer.append(pair.indexes, pair.block);
 			}
 		} 
-		finally
-		{
-			if( writer != null )
-				writer.close();
+		finally {
+			IOUtilFunctions.closeSilently(writer);
 		}	
-	}
-	
+	}	
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
index 8663038..0c4b570 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSpark.java
@@ -36,6 +36,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
 import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
 import org.apache.sysml.runtime.matrix.MatrixDimensionsMetaData;
 import org.apache.sysml.runtime.matrix.data.InputInfo;
@@ -55,10 +56,9 @@ public class RemoteDPParForSpark
 	
 	protected static final Log LOG = LogFactory.getLog(RemoteDPParForSpark.class.getName());
 
-	public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, MatrixObject input, 
-			                                   ExecutionContext ec,
-			                                   PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params
-			                                   boolean enableCPCaching, int numReducers )  //opt params
+	public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, String resultFile, 
+			MatrixObject input, ExecutionContext ec, PDataPartitionFormat dpf, OutputInfo oi, boolean tSparseCol, //config params
+			boolean enableCPCaching, int numReducers )  //opt params
 		throws DMLRuntimeException
 	{
 		String jobname = "ParFor-DPESP";
@@ -71,20 +71,26 @@ public class RemoteDPParForSpark
 		MatrixDimensionsMetaData md = (MatrixDimensionsMetaData) input.getMetaData();
 		MatrixCharacteristics mc = md.getMatrixCharacteristics();
 		InputInfo ii = InputInfo.BinaryBlockInputInfo;
-				
-		//initialize accumulators for tasks/iterations
+
+		//initialize accumulators for tasks/iterations, and inputs
+		JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar);
 		LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
 		LongAccumulator aIters = sc.sc().longAccumulator("iterations");
+
+		//compute number of reducers (to avoid OOMs and reduce memory pressure)
+		int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
+		int numParts2 = (int)((dpf==PDataPartitionFormat.ROW_BLOCK_WISE) ? mc.getRows() : mc.getCols()); 
+		int numReducers2 = Math.max(numReducers, Math.min(numParts, numParts2));
 		
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryBlockRDDHandleForVariable(matrixvar);
+		//core parfor datapartition-execute
 		DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, ii, oi, dpf);
 		RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, matrixvar, itervar, 
 				          enableCPCaching, mc, tSparseCol, dpf, oi, aTasks, aIters);
 		List<Tuple2<Long,String>> out = 
-				in.flatMapToPair(dpfun)         //partition the input blocks
-		          .groupByKey(numReducers)      //group partition blocks 		          
-		          .mapPartitionsToPair( efun )  //execute parfor tasks, incl cleanup
-		          .collect();                   //get output handles
+				in.flatMapToPair(dpfun)       //partition the input blocks
+		          .groupByKey(numReducers2)   //group partition blocks 		          
+		          .mapPartitionsToPair(efun)  //execute parfor tasks, incl cleanup
+		          .collect();                 //get output handles
 		
 		//de-serialize results
 		LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/baa70a15/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index e12d010..ad0fbf8 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -47,27 +47,25 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 {
 	private static final long serialVersionUID = 30223759283155139L;
 	
-	private String  _prog = null;
-	private boolean _caching = true;
-	private String _inputVar = null;
-	private String _iterVar = null;
+	private final String  _prog;
+	private final boolean _caching;
+	private final String _inputVar;
+	private final String _iterVar;
 	
-	private OutputInfo _oinfo = null;
-	private int _rlen = -1;
-	private int _clen = -1;
-	private int _brlen = -1;
-	private int _bclen = -1;
-	private boolean _tSparseCol = false;
-	private PDataPartitionFormat _dpf = null;
+	private final OutputInfo _oinfo;
+	private final int _rlen;
+	private final int _clen;
+	private final int _brlen;
+	private final int _bclen;
+	private final boolean _tSparseCol;
+	private final PDataPartitionFormat _dpf;
 	
-	private LongAccumulator _aTasks = null;
-	private LongAccumulator _aIters = null;
+	private final LongAccumulator _aTasks;
+	private final LongAccumulator _aIters;
 	
 	public RemoteDPParForSparkWorker(String program, String inputVar, String iterVar, boolean cpCaching, MatrixCharacteristics mc, boolean tSparseCol, PDataPartitionFormat dpf, OutputInfo oinfo, LongAccumulator atasks, LongAccumulator aiters) 
 		throws DMLRuntimeException
 	{
-		//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent
-		//when this constructor is actually called; hence, we do lazy initialization on task execution)
 		_prog = program;
 		_caching = cpCaching;
 		_inputVar = inputVar;
@@ -78,18 +76,13 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_aTasks = atasks;
 		_aIters = aiters;
 		
-		//setup matrixblock partition and meta data
-		_rlen = (int)mc.getRows();
-		_clen = (int)mc.getCols();
+		//setup matrix block partition meta data
+		_rlen = (dpf != PDataPartitionFormat.ROW_WISE) ? (int)mc.getRows() : 1;
+		_clen = (dpf != PDataPartitionFormat.COLUMN_WISE) ? (int)mc.getCols() : 1;
 		_brlen = mc.getRowsPerBlock();
 		_bclen = mc.getColsPerBlock();
 		_tSparseCol = tSparseCol;
 		_dpf = dpf;
-		switch( _dpf ) { //create matrix partition for reuse
-			case ROW_WISE:    _rlen = 1; break;
-			case COLUMN_WISE: _clen = 1; break;
-			default:  throw new RuntimeException("Partition format not yet supported in fused partition-execute: "+dpf);
-		}
 	}
 	
 	@Override 
@@ -102,14 +95,14 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3
 	
 		//process all matrix partitions of this data partition
+		MatrixBlock partition = null;
 		while( arg0.hasNext() )
 		{
 			Tuple2<Long,Iterable<Writable>> larg = arg0.next();
 			
 			//collect input partition (check via equals because oinfo deserialized instance)
-			MatrixBlock partition = null;
 			if( _oinfo.equals(OutputInfo.BinaryBlockOutputInfo) )
-				partition = collectBinaryBlock( larg._2() );
+				partition = collectBinaryBlock( larg._2(), partition );
 			else
 				partition = collectBinaryCellInput( larg._2() );
 			
@@ -178,42 +171,44 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 	 * will overwrite the result.
 	 * 
 	 * @param valueList iterable writables
+	 * @param reuse matrix block partition for reuse
 	 * @return matrix block
 	 * @throws IOException if IOException occurs
 	 */
-	private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList ) 
+	private MatrixBlock collectBinaryBlock( Iterable<Writable> valueList, MatrixBlock reuse ) 
 		throws IOException 
 	{
-		MatrixBlock partition = null;
+		MatrixBlock partition = reuse;
 		
 		try
 		{
 			//reset reuse block, keep configured representation
 			if( _tSparseCol )
 				partition = new MatrixBlock(_clen, _rlen, true);
+			else if( partition!=null )
+				partition.reset(_rlen, _clen, false);
 			else
 				partition = new MatrixBlock(_rlen, _clen, false);
 
-			for( Writable val : valueList )
-			{
-				PairWritableBlock pairValue = (PairWritableBlock) val;
-				int row_offset = (int)(pairValue.indexes.getRowIndex()-1)*_brlen;
-				int col_offset = (int)(pairValue.indexes.getColumnIndex()-1)*_bclen;
-				MatrixBlock block = pairValue.block;
+			long lnnz = 0;
+			for( Writable val : valueList ) {
+				PairWritableBlock pval = (PairWritableBlock) val;
+				int row_offset = (int)(pval.indexes.getRowIndex()-1)*_brlen;
+				int col_offset = (int)(pval.indexes.getColumnIndex()-1)*_bclen;
 				if( !partition.isInSparseFormat() ) //DENSE
-				{
-					partition.copy( row_offset, row_offset+block.getNumRows()-1, 
-							   col_offset, col_offset+block.getNumColumns()-1,
-							   pairValue.block, false ); 
-				}
+					partition.copy( row_offset, row_offset+pval.block.getNumRows()-1, 
+							   col_offset, col_offset+pval.block.getNumColumns()-1,
+							   pval.block, false ); 
 				else //SPARSE 
-				{
-					partition.appendToSparse(pairValue.block, row_offset, col_offset);
-				}
+					partition.appendToSparse(pval.block, row_offset, col_offset);
+				lnnz += pval.block.getNonZeros();
 			}
 
-			//final partition cleanup
-			cleanupCollectedMatrixPartition( partition, partition.isInSparseFormat() );
+			//post-processing: cleanups if required
+			if( partition.isInSparseFormat() && _clen>_bclen )
+				partition.sortSparseRows();
+			partition.setNonZeros(lnnz);
+			partition.examSparsity();
 		}
 		catch(DMLRuntimeException ex)
 		{
@@ -273,29 +268,17 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 				throw new IOException("Partition format not yet supported in fused partition-execute: "+_dpf);
 		}
 		
-		//final partition cleanup
-		cleanupCollectedMatrixPartition(partition, _tSparseCol);
-		
-		return partition;
-	}
-
-	private void cleanupCollectedMatrixPartition(MatrixBlock partition, boolean sort) 
-		throws IOException
-	{
-		//sort sparse row contents if required
-		if( partition.isInSparseFormat() && sort )
-			partition.sortSparseRows();
-
-		//ensure right number of nnz
-		if( !partition.isInSparseFormat() )
-			partition.recomputeNonZeros();
-			
-		//exam and switch dense/sparse representation
+		//post-processing: cleanups if required
 		try {
+			if( partition.isInSparseFormat() && _tSparseCol )
+				partition.sortSparseRows();
+			partition.recomputeNonZeros();
 			partition.examSparsity();
 		}
-		catch(Exception ex){
+		catch(DMLRuntimeException ex) {
 			throw new IOException(ex);
 		}
+			
+		return partition;
 	}
 }


[2/4] incubator-systemml git commit: [SYSTEMML-1126] Fix parfor spark/mr handling of custom compiler configs

Posted by mb...@apache.org.
[SYSTEMML-1126] Fix parfor spark/mr handling of custom compiler configs

This patch fixes an issue that has been introduced with SYSTEMML-584 and
affects, in the context of parfor spark/mr execute jobs, all
configurations that are taken over from the dml config into our compiler
config (i.e., optimization level and blocksize).


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/e82de90b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/e82de90b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/e82de90b

Branch: refs/heads/master
Commit: e82de90b21df44a3194a86ce0e087dc710cd34af
Parents: baa70a1
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 17:50:16 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:03 2017 -0800

----------------------------------------------------------------------
 .../runtime/controlprogram/parfor/ProgramConverter.java      | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/e82de90b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index f94be23..5cc9ca1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -31,9 +31,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.conf.CompilerConfig.ConfigType;
+import org.apache.sysml.conf.CompilerConfig;
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.conf.DMLConfig;
 import org.apache.sysml.hops.Hop;
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.hops.recompile.Recompiler;
 import org.apache.sysml.parser.DMLProgram;
 import org.apache.sysml.parser.DataIdentifier;
@@ -1345,8 +1347,10 @@ public class ProgramConverter
 		JobConf job = ConfigurationManager.getCachedJobConf();
 		if( !InfrastructureAnalyzer.isLocalMode(job) ) {
 			if( confStr != null && !confStr.trim().isEmpty() ) {
-				DMLConfig config = DMLConfig.parseDMLConfig(confStr);
-				ConfigurationManager.setLocalConfig(config);
+				DMLConfig dmlconf = DMLConfig.parseDMLConfig(confStr);
+				CompilerConfig cconf = OptimizerUtils.constructCompilerConfig(dmlconf);
+				ConfigurationManager.setLocalConfig(dmlconf);
+				ConfigurationManager.setLocalConfig(cconf);
 			}
 			//init internal configuration w/ parsed or default config
 			ParForProgramBlock.initInternalConfigurations(


[4/4] incubator-systemml git commit: [SYSTEMML-1309] Fix parfor spark working dir delete on shutdown

Posted by mb...@apache.org.
[SYSTEMML-1309] Fix parfor spark working dir delete on shutdown

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b78c1259
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b78c1259
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b78c1259

Branch: refs/heads/master
Commit: b78c125934fa7a947a5118f0c08473afa926fa5d
Parents: b028e6c
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 22:00:33 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:06 2017 -0800

----------------------------------------------------------------------
 .../parfor/RemoteDPParForSparkWorker.java       | 28 ++++++++-----
 .../parfor/RemoteParForSparkWorker.java         | 43 +++++++++++---------
 .../parfor/RemoteParForUtils.java               | 26 ++++++++++--
 .../sysml/runtime/util/LocalFileUtils.java      |  2 +-
 4 files changed, 65 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
index ad0fbf8..458f149 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteDPParForSparkWorker.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.controlprogram.ParForProgramBlock.PDataPartition
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
 import org.apache.sysml.runtime.controlprogram.parfor.Task.TaskType;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableBlock;
 import org.apache.sysml.runtime.controlprogram.parfor.util.PairWritableCell;
@@ -145,21 +146,28 @@ public class RemoteDPParForSparkWorker extends ParWorker implements PairFlatMapF
 		_numTasks    = 0;
 		_numIters    = 0;
 
-		//init local cache manager 
-		if( !CacheableData.isCachingActive() ) {
-			String uuid = IDHandler.createDistributedUniqueID();
-			LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-			CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
-		}		
-		if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
-			CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
+		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
+		//the initialization and immediately register the created directory for cleanup
+		//on process exit, i.e., executor exit, including any files created in the future.
+		synchronized( CacheableData.class ) {
+			if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { 
+				//create id, executor working dir, and cache dir
+				String uuid = IDHandler.createDistributedUniqueID();
+				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+				CacheableData.initCaching( uuid ); //incl activation and cache dir creation
+				CacheableData.cacheEvictionLocalFilePrefix = 
+						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+				//register entire working dir for delete on shutdown
+				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+			}	
 		}
 		
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		
-		//enable/disable caching (if required)
-		if( !_caching )
+		//enable/disable caching (if required and not in CP process)
+		if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
 			CacheableData.disableCaching();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e12376a..2ea802d 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysml.runtime.controlprogram.parfor.util.IDHandler;
 import org.apache.sysml.runtime.util.LocalFileUtils;
 
@@ -35,23 +36,20 @@ import scala.Tuple2;
 
 public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String> 
 {
-	
 	private static final long serialVersionUID = -3254950138084272296L;
 
+	private final String  _prog;
 	private boolean _initialized = false;
-	private String  _prog = null;
 	private boolean _caching = true;
 	
-	private LongAccumulator _aTasks = null;
-	private LongAccumulator _aIters = null;
+	private final LongAccumulator _aTasks;
+	private final LongAccumulator _aIters;
 	
 	public RemoteParForSparkWorker(String program, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
 		throws DMLRuntimeException
 	{
-		//keep inputs (unfortunately, spark does not expose task ids and it would be implementation-dependent
-		//when this constructor is actually called; hence, we do lazy initialization on task execution)
-		_initialized = false;
 		_prog = program;
+		_initialized = false;
 		_caching = cpCaching;
 		
 		//setup spark accumulators
@@ -65,7 +63,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	{
 		//lazy parworker initialization
 		if( !_initialized )
-			configureWorker( TaskContext.get().taskAttemptId() ); //requires Spark 1.3
+			configureWorker( TaskContext.get().taskAttemptId() );
 		
 		//execute a single task
 		long numIter = getExecutedIterations();
@@ -98,24 +96,31 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		_numTasks    = 0;
 		_numIters    = 0;
 
-		//init local cache manager 
-		if( !CacheableData.isCachingActive() ) {
-			String uuid = IDHandler.createDistributedUniqueID();
-			LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-			CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
-		}		
-		if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
-			CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
+		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
+		//the initialization and immediately register the created directory for cleanup
+		//on process exit, i.e., executor exit, including any files created in the future.
+		synchronized( CacheableData.class ) {
+			if( !CacheableData.isCachingActive() && !InfrastructureAnalyzer.isLocalMode() ) { 
+				//create id, executor working dir, and cache dir
+				String uuid = IDHandler.createDistributedUniqueID();
+				LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
+				CacheableData.initCaching( uuid ); //incl activation and cache dir creation
+				CacheableData.cacheEvictionLocalFilePrefix = 
+						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
+				//register entire working dir for delete on shutdown
+				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
+			}	
 		}
 		
 		//ensure that resultvar files are not removed
 		super.pinResultVariables();
 		
-		//enable/disable caching (if required)
-		if( !_caching )
+		//enable/disable caching (if required and not in CP process)
+		if( !_caching && !InfrastructureAnalyzer.isLocalMode() )
 			CacheableData.disableCaching();
 		
-		//make as lazily intialized
+		//mark as initialized
 		_initialized = true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
index 7b3ecb1..fd99429 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForUtils.java
@@ -190,12 +190,9 @@ public class RemoteParForUtils
 		
 		return ret;
 	}
-		
 	
 	/**
-	 * Cleanup all temporary files created by this SystemML process
-	 * instance.
-	 * 
+	 * Cleanup all temporary files created by this SystemML process.
 	 */
 	public static void cleanupWorkingDirectories()
 	{
@@ -216,6 +213,15 @@ public class RemoteParForUtils
 		}
 	}
 
+	/**
+	 * Cleanup all temporary files created by this SystemML process,
+	 * on shutdown via exit or interrupt.
+	 */
+	public static void cleanupWorkingDirectoriesOnShutdown() {
+		Runtime.getRuntime().addShutdownHook(
+				new DeleteWorkingDirectoriesTask());
+	}
+	
 	public static LocalVariableMap[] getResults( List<Tuple2<Long,String>> out, Log LOG ) 
 		throws DMLRuntimeException
 	{
@@ -241,4 +247,16 @@ public class RemoteParForUtils
 		//create return array
 		return tmp.values().toArray(new LocalVariableMap[0]);	
 	}
+	
+	/**
+	 * Task to be registered as shutdown hook in order to delete the 
+	 * all working directories, including any remaining files, which 
+	 * might not have been created  at time of registration.
+	 */
+	private static class DeleteWorkingDirectoriesTask extends Thread {
+		@Override
+		public void run() {
+			cleanupWorkingDirectories();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b78c1259/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
index c868f60..0086f8f 100644
--- a/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/util/LocalFileUtils.java
@@ -305,7 +305,7 @@ public class LocalFileUtils
 		return createWorkingDirectoryWithUUID( DMLScript.getUUID() );
 	}
 
-	public static synchronized String createWorkingDirectoryWithUUID( String uuid )
+	public static String createWorkingDirectoryWithUUID( String uuid )
 		throws DMLRuntimeException 
 	{
 		//create local tmp dir if not existing


[3/4] incubator-systemml git commit: [SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp

Posted by mb...@apache.org.
[SYSTEMML-1350] Avoid unnecessary RDD export on parfor spark dpesp

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b028e6ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b028e6ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b028e6ce

Branch: refs/heads/master
Commit: b028e6cee12d8cc9bb4e1728fffc852cef7282c1
Parents: e82de90
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 24 19:03:56 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Feb 25 11:51:05 2017 -0800

----------------------------------------------------------------------
 .../controlprogram/ParForProgramBlock.java      | 33 +++++++++-----------
 1 file changed, 15 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b028e6ce/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index af3a0d1..4cdfa7b 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1063,8 +1064,8 @@ public class ParForProgramBlock extends ForProgramBlock
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
+		//write matrices to HDFS, except DP matrix which is the input to the RemoteDPParForSpark job
+		exportMatricesToHDFS(ec, _colocatedDPMatrix);
 				
 		// Step 4) submit MR job (wait for finished work)
 		OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
@@ -1258,37 +1259,33 @@ public class ParForProgramBlock extends ForProgramBlock
 			}
 	}
 
-	private void exportMatricesToHDFS( ExecutionContext ec ) 
+	private void exportMatricesToHDFS(ExecutionContext ec, String... blacklistNames) 
 		throws CacheException 
 	{
 		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
+		HashSet<String> blacklist = new HashSet<String>(Arrays.asList(blacklistNames));
 		
 		if( LIVEVAR_AWARE_EXPORT && sb != null)
 		{
 			//optimization to prevent unnecessary export of matrices
 			//export only variables that are read in the body
 			VariableSet varsRead = sb.variablesRead();
-			for (String key : ec.getVariables().keySet() ) 
-			{
-				Data d = ec.getVariable(key);
-				if (    d.getDataType() == DataType.MATRIX
-					 && varsRead.containsVariable(key)  )
-				{
-					MatrixObject mo = (MatrixObject)d;
-					mo.exportData( _replicationExport );
+			for (String key : ec.getVariables().keySet() ) {
+				if( varsRead.containsVariable(key) && !blacklist.contains(key) ) {
+					Data d = ec.getVariable(key);
+					if( d.getDataType() == DataType.MATRIX )
+						((MatrixObject)d).exportData(_replicationExport);
 				}
 			}
 		}
 		else
 		{
 			//export all matrices in symbol table
-			for (String key : ec.getVariables().keySet() ) 
-			{
-				Data d = ec.getVariable(key);
-				if ( d.getDataType() == DataType.MATRIX )
-				{
-					MatrixObject mo = (MatrixObject)d;
-					mo.exportData( _replicationExport );
+			for (String key : ec.getVariables().keySet() ) {
+				if( !blacklist.contains(key) ) {
+					Data d = ec.getVariable(key);
+					if( d.getDataType() == DataType.MATRIX )
+						((MatrixObject)d).exportData(_replicationExport);
 				}
 			}
 		}