You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/02 23:30:23 UTC

[1/2] systemml git commit: [SYSTEMML-1879] Performance parfor remote spark (reuse shared inputs)

Repository: systemml
Updated Branches:
  refs/heads/master 912c65506 -> ba73291c9


[SYSTEMML-1879] Performance parfor remote spark (reuse shared inputs)

In parfor remote spark jobs, each worker is initialized with its own
deserialized symbol table, which causes redundant reads of shared inputs
in each parfor worker and is unnecessarily memory-inefficient. This
patch introduces a principled approach to reusing shared inputs, where
we reuse all variables except for result variables and partitioned
matrices. By simply using common instances of matrix objects, the
sharing happens automatically through the bufferpool similar to local
parfor execution and without additional pinned memory requirements. On
the perftest scenario MSVM 1M x 1K, sparse with 150 classes and 25
iterations, the end-to-end runtime (including read and spark context
creation) improved from 94s to 72s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/2c57cf77
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/2c57cf77
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/2c57cf77

Branch: refs/heads/master
Commit: 2c57cf779e3b1a583ee4062bbd268f592537ef05
Parents: 912c655
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Sep 1 20:29:49 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Sep 1 20:31:08 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/parser/DMLTranslator.java  |  14 +-
 .../controlprogram/LocalVariableMap.java        |   5 +
 .../controlprogram/ParForProgramBlock.java      | 464 ++++++++-----------
 .../context/ExecutionContext.java               |  45 +-
 .../parfor/CachedReuseVariables.java            |  59 +++
 .../controlprogram/parfor/ProgramConverter.java |   8 +-
 .../parfor/RemoteParForSpark.java               |  32 +-
 .../parfor/RemoteParForSparkWorker.java         |  33 +-
 .../parfor/TaskPartitionerFactoring.java        |  33 +-
 9 files changed, 356 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/parser/DMLTranslator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/DMLTranslator.java b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
index e6b8590..f44c0a4 100644
--- a/src/main/java/org/apache/sysml/parser/DMLTranslator.java
+++ b/src/main/java/org/apache/sysml/parser/DMLTranslator.java
@@ -617,12 +617,11 @@ public class DMLTranslator
 			ForProgramBlock rtpb = null;
 			IterablePredicate iterPred = fsb.getIterPredicate();
 			
-			if( sb instanceof ParForStatementBlock )
-			{
+			if( sb instanceof ParForStatementBlock ) {
 				sbName = "ParForStatementBlock";
-				rtpb = new ParForProgramBlock(prog, iterPred.getIterVar().getName(), iterPred.getParForParams());
+				rtpb = new ParForProgramBlock(prog, iterPred.getIterVar().getName(),
+					iterPred.getParForParams(), ((ParForStatementBlock)sb).getResultVariables());
 				ParForProgramBlock pfrtpb = (ParForProgramBlock)rtpb;
-				pfrtpb.setResultVariables( ((ParForStatementBlock)sb).getResultVariables() );
 				pfrtpb.setStatementBlock((ParForStatementBlock)sb); //used for optimization and creating unscoped variables
 			}
 			else {//ForStatementBlock
@@ -636,8 +635,8 @@ public class DMLTranslator
 			
 			// process the body of the for statement block
 			if (fsb.getNumStatements() > 1){
-				LOG.error(fsb.printBlockErrorLocation() + " "  + sbName + " should have 1 statement" );
-				throw new LopsException(fsb.printBlockErrorLocation() + " "  + sbName + " should have 1 statement" );
+				LOG.error(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" );
+				throw new LopsException(fsb.printBlockErrorLocation() + " " + sbName + " should have 1 statement" );
 			}
 			ForStatement fs = (ForStatement)fsb.getStatement(0);
 			for (StatementBlock sblock : fs.getBody()){
@@ -653,9 +652,6 @@ public class DMLTranslator
 			
 			retPB = rtpb;
 			
-			//post processing for generating missing instructions
-			//retPB = verifyAndCorrectProgramBlock(sb.liveIn(), sb.liveOut(), sb._kill, retPB);
-			
 			// add statement block
 			retPB.setStatementBlock(sb);
 			

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 94d85ad..7e59951 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -91,6 +91,11 @@ public class LocalVariableMap implements Cloneable
 		localMap.clear();
 	}
 	
+	public void removeAllIn(Set<String> blacklist) {
+		localMap.entrySet().removeIf(
+			e -> blacklist.contains(e.getKey()));
+	}
+	
 	public void removeAllNotIn(Set<String> blacklist) {
 		localMap.entrySet().removeIf(
 			e -> !blacklist.contains(e.getKey()));

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index ce8bbed..b393cd1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -125,22 +125,22 @@ public class ParForProgramBlock extends ForProgramBlock
 {	
 	// execution modes
 	public enum PExecMode {
-		LOCAL,      //local (master) multi-core execution mode
-		REMOTE_MR,	//remote (MR cluster) execution mode
-		REMOTE_MR_DP,	//remote (MR cluster) execution mode, fused with data partitioning
-		REMOTE_SPARK,	//remote (Spark cluster) execution mode
+		LOCAL,          //local (master) multi-core execution mode
+		REMOTE_MR,      //remote (MR cluster) execution mode
+		REMOTE_MR_DP,   //remote (MR cluster) execution mode, fused with data partitioning
+		REMOTE_SPARK,   //remote (Spark cluster) execution mode
 		REMOTE_SPARK_DP,//remote (Spark cluster) execution mode, fused with data partitioning
 		UNSPECIFIED
 	}
 
 	// task partitioner
 	public enum PTaskPartitioner {
-		FIXED,      //fixed-sized task partitioner, uses tasksize 
-		NAIVE,      //naive task partitioner (tasksize=1)
-		STATIC,     //static task partitioner (numIterations/numThreads)
-		FACTORING,  //factoring task partitioner  
-		FACTORING_CMIN,  //constrained factoring task partitioner, uses tasksize as min constraint
-		FACTORING_CMAX,  //constrained factoring task partitioner, uses tasksize as max constraint
+		FIXED,          //fixed-sized task partitioner, uses tasksize 
+		NAIVE,          //naive task partitioner (tasksize=1)
+		STATIC,         //static task partitioner (numIterations/numThreads)
+		FACTORING,      //factoring task partitioner  
+		FACTORING_CMIN, //constrained factoring task partitioner, uses tasksize as min constraint
+		FACTORING_CMAX, //constrained factoring task partitioner, uses tasksize as max constraint
 		UNSPECIFIED
 	}
 	
@@ -259,10 +259,10 @@ public class ParForProgramBlock extends ForProgramBlock
 	}
 	
 	public enum PDataPartitioner {
-		NONE,       // no data partitioning
-		LOCAL,      // local file based partition split on master node
-		REMOTE_MR,  // remote partition split using a reblock MR job 
-		REMOTE_SPARK, // remote partition split using a spark job
+		NONE,            // no data partitioning
+		LOCAL,           // local file based partition split on master node
+		REMOTE_MR,       // remote partition split using a reblock MR job 
+		REMOTE_SPARK,    // remote partition split using a spark job
 		UNSPECIFIED, 
   	}
 
@@ -277,21 +277,21 @@ public class ParForProgramBlock extends ForProgramBlock
 	
 	//optimizer
 	public enum POptMode{
-		NONE,        //no optimization, use defaults and specified parameters
-		RULEBASED,   //rule-based rewritings with memory constraints 
-		CONSTRAINED, //same as rule-based but with given params as constraints
-		HEURISTIC,   //smae as rule-based but with time-based cost estimates
+		NONE,            //no optimization, use defaults and specified parameters
+		RULEBASED,       //rule-based rewritings with memory constraints 
+		CONSTRAINED,     //same as rule-based but with given params as constraints
+		HEURISTIC,       //same as rule-based but with time-based cost estimates
 	}
-		
+	
 	// internal parameters
-	public static final boolean OPTIMIZE                    = true;	// run all automatic optimizations on top-level parfor
+	public static final boolean OPTIMIZE                    = true; // run all automatic optimizations on top-level parfor
 	public static final boolean USE_PB_CACHE                = false; // reuse copied program blocks whenever possible, not there can be issues related to recompile
-	public static       boolean USE_RANGE_TASKS_IF_USEFUL   = true;   	// use range tasks whenever size>3, false, otherwise wrong split order in remote 
-	public static final boolean USE_STREAMING_TASK_CREATION = true;  	// start working while still creating tasks, prevents blocking due to too small task queue
-	public static final boolean ALLOW_NESTED_PARALLELISM	= true;    // if not, transparently change parfor to for on program conversions (local,remote)
-	public static       boolean ALLOW_REUSE_MR_JVMS         = true;    // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
+	public static final boolean USE_RANGE_TASKS_IF_USEFUL   = true; // use range tasks whenever size>3, false, otherwise wrong split order in remote 
+	public static final boolean USE_STREAMING_TASK_CREATION = true; // start working while still creating tasks, prevents blocking due to too small task queue
+	public static final boolean ALLOW_NESTED_PARALLELISM	= true; // if not, transparently change parfor to for on program conversions (local,remote)
+	public static       boolean ALLOW_REUSE_MR_JVMS         = true; // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
 	public static       boolean ALLOW_REUSE_MR_PAR_WORKER   = ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory objects and result consolidation!
-	public static final boolean USE_PARALLEL_RESULT_MERGE   = false;    // if result merge is run in parallel or serial 
+	public static final boolean USE_PARALLEL_RESULT_MERGE   = false; // if result merge is run in parallel or serial 
 	public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars
 	public static final boolean ALLOW_DATA_COLOCATION       = true;
 	public static final boolean CREATE_UNSCOPED_RESULTVARS  = true;
@@ -299,7 +299,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	public static final int     WRITE_REPLICATION_FACTOR    = 1;
 	public static final int     MAX_RETRYS_ON_ERROR         = 1;
 	public static final boolean FORCE_CP_ON_REMOTE_MR       = true; // compile body to CP if exec type forced to MR
-	public static final boolean LIVEVAR_AWARE_EXPORT        = true; //export only read variables according to live variable analysis
+	public static final boolean LIVEVAR_AWARE_EXPORT        = true; // export only read variables according to live variable analysis
 	public static final boolean RESET_RECOMPILATION_FLAGs   = true;
  	
  	public static final String PARFOR_FNAME_PREFIX          = "/parfor/"; 
@@ -311,69 +311,61 @@ public class ParForProgramBlock extends ForProgramBlock
 	public static final String PARFOR_COUNTER_GROUP_NAME    = "SystemML ParFOR Counters";
 	
 	// static ID generator sequences
-	private static IDSequence   _pfIDSeq        = null;
-	private static IDSequence   _pwIDSeq        = null;
+	private final static IDSequence _pfIDSeq = new IDSequence();
+	private final static IDSequence _pwIDSeq = new IDSequence();
 	
 	// runtime parameters
-	protected HashMap<String,String> _params    = null;
-	protected int              _numThreads      = -1;
-	protected PTaskPartitioner _taskPartitioner = null; 
-	protected long             _taskSize        = -1;
+	protected final HashMap<String,String> _params;
+	protected final boolean _monitor;
+	protected final Level _optLogLevel;
+	protected int _numThreads = -1;
+	protected long _taskSize = -1;
+	protected PTaskPartitioner _taskPartitioner = null;
 	protected PDataPartitioner _dataPartitioner = null;
-	protected PResultMerge     _resultMerge     = null;
-	protected PExecMode        _execMode        = null;
-	protected POptMode         _optMode         = null;
-	protected boolean          _monitor         = false;
-	protected Level            _optLogLevel     = null;
-	
+	protected PResultMerge _resultMerge = null;
+	protected PExecMode _execMode = null;
+	protected POptMode _optMode = null;
 	
 	//specifics used for optimization
-	protected long             _numIterations   = -1; 
+	protected long _numIterations = -1;
 	
 	//specifics used for data partitioning
 	protected LocalVariableMap _variablesDPOriginal = null;
-	protected LocalVariableMap _variablesDPReuse    = null;
-	protected String           _colocatedDPMatrix   = null;
-	protected boolean          _tSparseCol          = false;
-	protected int              _replicationDP       = WRITE_REPLICATION_FACTOR;
-	protected int              _replicationExport   = -1;
+	protected LocalVariableMap _variablesDPReuse = null;
+	protected String _colocatedDPMatrix = null;
+	protected boolean _tSparseCol = false;
+	protected int _replicationDP = WRITE_REPLICATION_FACTOR;
+	protected int _replicationExport = -1;
 	//specifics used for result partitioning
-	protected boolean          _jvmReuse            = true;
+	protected boolean _jvmReuse = true;
 	//specifics used for recompilation 
-	protected double           _oldMemoryBudget = -1;
-	protected double           _recompileMemoryBudget = -1;
+	protected double _oldMemoryBudget = -1;
+	protected double _recompileMemoryBudget = -1;
 	//specifics for caching
-	protected boolean          _enableCPCaching     = true;
-	protected boolean          _enableRuntimePiggybacking = false;
+	protected boolean _enableCPCaching = true;
+	protected boolean _enableRuntimePiggybacking = false;
 	//specifics for spark 
 	protected Collection<String> _variablesRP = null;
 	protected Collection<String> _variablesECache = null;
 	
 	// program block meta data
-	protected long                _ID           = -1;
-	protected int                 _IDPrefix     = -1;
-	protected ArrayList<String>  _resultVars      = null;
-	protected IDSequence         _resultVarsIDSeq = null;
-	protected IDSequence         _dpVarsIDSeq     = null;
-	protected boolean            _monitorReport   = false;
-	protected boolean            _hasFunctions    = true;
+	protected final ArrayList<String> _resultVars;
+	protected final IDSequence _resultVarsIDSeq;
+	protected final IDSequence _dpVarsIDSeq;
+	protected final boolean _hasFunctions;
+	
+	protected long _ID = -1;
+	protected int _IDPrefix = -1;
+	protected boolean _monitorReport = false;
 	
 	// local parworker data
-	protected long[] 		   	                    _pwIDs   = null;
 	protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null;
+	protected long[] _pwIDs = null;
 	
-	
-	static
-	{
-		//init static ID sequence generators
-		_pfIDSeq = new IDSequence();
-		_pwIDSeq = new IDSequence();
-	}
-	
-	public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params) 
+	public ParForProgramBlock(Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars)
 		throws DMLRuntimeException 
 	{
-		this( -1, prog, iterPredVar, params);
+		this( -1, prog, iterPredVar, params, resultVars);
 	}
 	
 	/**
@@ -384,9 +376,10 @@ public class ParForProgramBlock extends ForProgramBlock
 	 * @param prog runtime program
 	 * @param iterPredVars ?
 	 * @param params map of parameters
+	 * @param resultVars list of result variable names
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params) 
+	public ParForProgramBlock(int ID, Program prog, String iterPredVar, HashMap<String,String> params, ArrayList<String> resultVars) 
 	{
 		super(prog, iterPredVar);
 
@@ -395,6 +388,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		
 		//ID generation and setting 
 		setParForProgramBlockIDs( ID );
+		_resultVars = resultVars;
 		_resultVarsIDSeq = new IDSequence();
 		_dpVarsIDSeq = new IDSequence();
 		
@@ -407,18 +401,18 @@ public class ParForProgramBlock extends ForProgramBlock
 			_dataPartitioner = PDataPartitioner.valueOf( getParForParam(ParForStatementBlock.DATA_PARTITIONER) );
 			_resultMerge     = PResultMerge.valueOf( getParForParam(ParForStatementBlock.RESULT_MERGE) );
 			_execMode        = PExecMode.valueOf( getParForParam(ParForStatementBlock.EXEC_MODE) );
-			_optMode         = POptMode.valueOf( getParForParam(ParForStatementBlock.OPT_MODE) );		
+			_optMode         = POptMode.valueOf( getParForParam(ParForStatementBlock.OPT_MODE) );
 			_optLogLevel     = Level.toLevel( getParForParam(ParForStatementBlock.OPT_LOG) );
 			_monitor         = (Integer.parseInt(getParForParam(ParForStatementBlock.PROFILE) ) == 1);
 		}
 		catch(Exception ex) {
 			throw new RuntimeException("Error parsing specified ParFOR parameters.",ex);
 		}
-			
+		
 		//reset the internal opt mode if optimization globally disabled.
 		if( !OPTIMIZE )
 			_optMode = POptMode.NONE;
-			
+		
 		_variablesDPOriginal = new LocalVariableMap();
 		_variablesDPReuse = new LocalVariableMap();
 		
@@ -453,19 +447,14 @@ public class ParForProgramBlock extends ForProgramBlock
 	
 	public String getParForParam(String key) {
 		String tmp = getParForParams().get(key);
-		return (tmp == null) ? null :  
+		return (tmp == null) ? null :
 			UtilFunctions.unquote(tmp).toUpperCase();
 	}
 
-	public ArrayList<String> getResultVariables()
-	{
+	public ArrayList<String> getResultVariables() {
 		return _resultVars;
 	}
 	
-	public void setResultVariables(ArrayList<String> resultVars) {
-		_resultVars = resultVars;
-	}
-	
 	public void disableOptimization() {
 		_optMode = POptMode.NONE;
 	}
@@ -574,7 +563,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS;
 	}
 	
-	@Override	
+	@Override
 	public void execute(ExecutionContext ec)
 		throws DMLRuntimeException
 	{
@@ -621,7 +610,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		
 		if( _monitor ) 
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_DATA_T, time.stop());
-			
+		
 		// initialize iter var to form value
 		IntObject iterVar = new IntObject(_iterPredVar, from.getLongValue() );
 		
@@ -630,8 +619,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		///////
 		LOG.trace("EXECUTE PARFOR ID = "+_ID+" with mode = "+_execMode+", numThreads = "+_numThreads+", taskpartitioner = "+_taskPartitioner);
 		
-		if( _monitor )
-		{
+		if( _monitor ) {
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTHREADS,      _numThreads);
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKSIZE,        _taskSize);
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKPARTITIONER, _taskPartitioner.ordinal());
@@ -644,13 +632,13 @@ public class ParForProgramBlock extends ForProgramBlock
 		HashMap<String, Boolean> varState = ec.pinVariables(varList);
 		
 		try 
-		{		
+		{
 			switch( _execMode )
 			{
 				case LOCAL: //create parworkers as local threads
 					executeLocalParFor(ec, iterVar, from, to, incr);
 					break;
-					
+				
 				case REMOTE_MR: // create parworkers as MR tasks (one job per parfor)
 					executeRemoteMRParFor(ec, iterVar, from, to, incr);
 					break;
@@ -669,10 +657,9 @@ public class ParForProgramBlock extends ForProgramBlock
 				
 				default:
 					throw new DMLRuntimeException("Undefined execution mode: '"+_execMode+"'.");
-			}	
+			}
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException("PARFOR: Failed to execute loop in parallel.",ex);
 		}
 		
@@ -688,8 +675,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		
 		//ensure that subsequent program blocks never see partitioned data (invalid plans!)
 		//we can replace those variables, because partitioning only applied for read-only matrices
-		for( String var : _variablesDPOriginal.keySet() )
-		{
+		for( String var : _variablesDPOriginal.keySet() ) {
 			//cleanup partitioned matrix (if not reused)
 			if( !_variablesDPReuse.keySet().contains(var) )
 				VariableCPInstruction.processRemoveVariableInstruction(ec, var); 
@@ -704,19 +690,19 @@ public class ParForProgramBlock extends ForProgramBlock
 	
 		//print profiling report (only if top-level parfor because otherwise in parallel context)
 		if( _monitorReport )
-		    LOG.info("\n"+StatisticMonitor.createReport());
+			LOG.info("\n"+StatisticMonitor.createReport());
 		
 		//reset flags/modifications made by optimizer
 		//TODO reset of hop parallelism constraint (e.g., ba+*)
 		for( String dpvar : _variablesDPOriginal.keySet() ) //release forced exectypes
-		    ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false);
+			ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false);
 		 //release forced exectypes for fused dp/exec
 		if( _execMode == PExecMode.REMOTE_MR_DP || _execMode == PExecMode.REMOTE_SPARK_DP )
 			ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, _colocatedDPMatrix, ec, false); 
 		resetOptimizerFlags(); //after release, deletes dp_varnames
 		
 		//execute exit instructions (usually empty)
-		executeInstructions(_exitInstructions, ec);			
+		executeInstructions(_exitInstructions, ec);
 	}
 
 
@@ -814,8 +800,7 @@ public class ParForProgramBlock extends ForProgramBlock
 			
 			if( _monitor ) 
 				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-				
-				
+			
 			// Step 4) collecting results from each parallel worker
 			//obtain results and cleanup other intermediates before result merge
 			LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; 
@@ -862,16 +847,15 @@ public class ParForProgramBlock extends ForProgramBlock
 				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
 			}
 		}
-	}	
+	}
 
 	private void executeRemoteMRParFor( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
 		throws DMLRuntimeException, IOException
 	{
 		/* Step 0) check and recompile MR inst
 		 * Step 1) serialize child PB and inst
-		 * Step 2) create tasks
-		 *         serialize tasks
-		 * Step 3) submit MR Jobs and wait for results                        
+		 * Step 2) create and serialize tasks
+		 * Step 3) submit MR Jobs and wait for results
 		 * Step 4) collect results from each parallel worker
 		 */
 		
@@ -884,10 +868,10 @@ public class ParForProgramBlock extends ForProgramBlock
 			//tid = 0  because replaced in remote parworker
 			flagForced = checkMRAndRecompileToCP(0);
 		}
-			
+		
 		// Step 1) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
+		// NOTES: each mapper changes filenames with regard to his ID as we submit a single
+		// job, cannot reuse serialized string, since variables are serialized as well.
 		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
 		String program = ProgramConverter.serializeParForBody( body );
 		
@@ -902,64 +886,60 @@ public class ParForProgramBlock extends ForProgramBlock
 		long numIterations = partitioner.getNumIterations();
 		int maxDigits = (int)Math.log10(to.getLongValue()) + 1;
 		long numCreatedTasks = -1;
-		if( USE_STREAMING_TASK_CREATION )
-		{
+		if( USE_STREAMING_TASK_CREATION ) {
 			LocalTaskQueue<Task> queue = new LocalTaskQueue<Task>();
-
+			
 			//put tasks into queue and start writing to taskFile
 			numCreatedTasks = partitioner.createTasks(queue);
-			taskFile        = writeTasksToFile( taskFile, queue, maxDigits );				
+			taskFile = writeTasksToFile( taskFile, queue, maxDigits );
 		}
 		else
 		{
 			//sequentially create tasks and write to disk
 			List<Task> tasks = partitioner.createTasks();
-			numCreatedTasks  = tasks.size();
-		    taskFile         = writeTasksToFile( taskFile, tasks, maxDigits );				
+			numCreatedTasks = tasks.size();
+			taskFile = writeTasksToFile( taskFile, tasks, maxDigits );
 		}
-				
+		
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
 		//write matrices to HDFS 
 		exportMatricesToHDFS(ec);
-				
+		
 		// Step 3) submit MR job (wait for finished work)
 		MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? ec.getMatrixObject(_colocatedDPMatrix) : null;
-		RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile, colocatedDPMatrixObj, _enableCPCaching,
-				                                          _numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, getMinMemory(ec),
-				                                          (ALLOW_REUSE_MR_JVMS & _jvmReuse) );
+		RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile,
+			colocatedDPMatrixObj, _enableCPCaching,_numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, 
+			getMinMemory(ec), (ALLOW_REUSE_MR_JVMS & _jvmReuse) );
 		
 		if( _monitor ) 
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
-			
+		
 		// Step 4) collecting results from each parallel worker
 		int numExecutedTasks = ret.getNumExecutedTasks();
 		int numExecutedIterations = ret.getNumExecutedIterations();
 		
 		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks, 
-				                    ret.getVariables() );
+		consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+			numExecutedIterations , numExecutedTasks, ret.getVariables() );
 		if( flagForced ) //see step 0
 			releaseForcedRecompile(0);
 		
-		if( _monitor ) 
-		{
+		if( _monitor ) {
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
-	}	
+		}
+	}
 
 	private void executeRemoteMRParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
 		throws DMLRuntimeException, IOException
 	{
 		/* Step 0) check and recompile MR inst
 		 * Step 1) serialize child PB and inst
-		 * Step 2) create tasks
-		 *         serialize tasks
-		 * Step 3) submit MR Jobs and wait for results                        
+		 * Step 2) create and serialize tasks
+		 * Step 3) submit MR Jobs and wait for results
 		 * Step 4) collect results from each parallel worker
 		 */
 		
@@ -975,8 +955,8 @@ public class ParForProgramBlock extends ForProgramBlock
 		inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned  
 		
 		// Step 2) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
+		// NOTES: each mapper changes filenames with regard to his ID as we submit a single
+		// job, cannot reuse serialized string, since variables are serialized as well.
 		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
 		String program = ProgramConverter.serializeParForBody( body );
 		
@@ -988,41 +968,40 @@ public class ParForProgramBlock extends ForProgramBlock
 		String resultFile = constructResultFileName();
 		long numIterations = partitioner.getNumIterations();
 		long numCreatedTasks = numIterations;//partitioner.createTasks().size();
-						
+		
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
 		//write matrices to HDFS 
 		exportMatricesToHDFS(ec);
-				
+		
 		// Step 4) submit MR job (wait for finished work)
-		OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE)||
-				              (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE))? 
-				             OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
+		OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PartitionFormat.COLUMN_WISE)
+			|| (inputMatrix.getSparsity()<0.001 && inputDPF==PartitionFormat.ROW_WISE)) ?
+			OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
 		RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, 
-				inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP );
+			inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP );
 		
-		if( _monitor ) 
+		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
+		
 		// Step 5) collecting results from each parallel worker
 		int numExecutedTasks = ret.getNumExecutedTasks();
 		int numExecutedIterations = ret.getNumExecutedIterations();
 		
 		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-				                    ret.getVariables() );
+		consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+			numExecutedIterations, numExecutedTasks, ret.getVariables() );
 		
 		if( flagForced ) //see step 0
 			releaseForcedRecompile(0);
 		inputMatrix.unsetPartitioned();
 		
-		if( _monitor ) 
-		{
+		if( _monitor ) {
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
+		}
 	}
 
 	private void executeRemoteSparkParFor(ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr) 
@@ -1032,15 +1011,15 @@ public class ParForProgramBlock extends ForProgramBlock
 		
 		// Step 0) check and compile to CP (if forced remote parfor)
 		boolean flagForced = false;
-		if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) )
-		{
+		if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE 
+			|| (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) ) {
 			//tid = 0  because replaced in remote parworker
 			flagForced = checkMRAndRecompileToCP(0); 
 		}
 			
 		// Step 1) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
+		// NOTES: each mapper changes filenames with regard to his ID as we submit a single 
+		// job, cannot reuse serialized string, since variables are serialized as well.
 		ParForBody body = new ParForBody(_childBlocks, _resultVars, ec);
 		HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>();
 		String program = ProgramConverter.serializeParForBody(body, clsMap);
@@ -1055,37 +1034,35 @@ public class ParForProgramBlock extends ForProgramBlock
 		//sequentially create tasks as input to parfor job
 		List<Task> tasks = partitioner.createTasks();
 		long numCreatedTasks = tasks.size();
-				
+		
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
 		//write matrices to HDFS 
 		exportMatricesToHDFS(ec);
-				
+		
 		// Step 3) submit Spark parfor job (no lazy evaluation, since collect on result)
 		//MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : null;
 		RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program, clsMap, tasks, ec, _enableCPCaching, _numThreads);
 		
 		if( _monitor ) 
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
-			
+		
 		// Step 4) collecting results from each parallel worker
 		int numExecutedTasks = ret.getNumExecutedTasks();
 		int numExecutedIterations = ret.getNumExecutedIterations();
 		
 		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks, 
-				                    ret.getVariables() );
+		consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+			numExecutedIterations , numExecutedTasks, ret.getVariables() );
 		if( flagForced ) //see step 0
 			releaseForcedRecompile(0);
 		
-		if( _monitor ) 
-		{
+		if( _monitor ) {
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
+		}
 	}
 	
 	private void executeRemoteSparkParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
@@ -1096,15 +1073,15 @@ public class ParForProgramBlock extends ForProgramBlock
 		// Step 0) check and compile to CP (if forced remote parfor)
 		boolean flagForced = checkMRAndRecompileToCP(0);
 		
-		// Step 1) prepare partitioned input matrix (needs to happen before serializing the progam)
+		// Step 1) prepare partitioned input matrix (needs to happen before serializing the program)
 		ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
 		MatrixObject inputMatrix = ec.getMatrixObject(_colocatedDPMatrix );
 		PartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix );
-		inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned  
+		inputMatrix.setPartitioned(inputDPF._dpf, inputDPF._N); //mark matrix var as partitioned
 		
 		// Step 2) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
+		// NOTES: each mapper changes filenames with regard to his ID as we submit a single
+		// job, cannot reuse serialized string, since variables are serialized as well.
 		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
 		HashMap<String, byte[]> clsMap = new HashMap<String, byte[]>(); 
 		String program = ProgramConverter.serializeParForBody( body, clsMap );
@@ -1117,7 +1094,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		String resultFile = constructResultFileName();
 		long numIterations = partitioner.getNumIterations();
 		long numCreatedTasks = numIterations;//partitioner.createTasks().size();
-						
+		
 		if( _monitor )
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
 		
@@ -1126,34 +1103,30 @@ public class ParForProgramBlock extends ForProgramBlock
 		
 		// Step 4) submit MR job (wait for finished work)
 		//TODO runtime support for binary cell partitioning 
-		//OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
-		//		              (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))? 
-		//		             OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
 		OutputInfo inputOI = OutputInfo.BinaryBlockOutputInfo;
-		RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, clsMap, 
-				resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads );
+		RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program,
+			clsMap, resultFile, inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads );
 		
 		if( _monitor ) 
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
+		
 		// Step 5) collecting results from each parallel worker
 		int numExecutedTasks = ret.getNumExecutedTasks();
 		int numExecutedIterations = ret.getNumExecutedIterations();
 		
 		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-				                    ret.getVariables() );
+		consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+			numExecutedIterations, numExecutedTasks, ret.getVariables() );
 		
 		if( flagForced ) //see step 0
 			releaseForcedRecompile(0);
 		inputMatrix.unsetPartitioned();
 		
-		if( _monitor ) 
-		{
+		if( _monitor ) {
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
 			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
+		}
 	}
 
 	private void handleDataPartitioning( ExecutionContext ec ) 
@@ -1161,7 +1134,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	{
 		PDataPartitioner dataPartitioner = _dataPartitioner;
 		if( dataPartitioner != PDataPartitioner.NONE )
-		{			
+		{
 			ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
 			if( sb == null )
 				throw new DMLRuntimeException("ParFor statement block required for reasoning about data partitioning.");
@@ -1171,19 +1144,20 @@ public class ParForProgramBlock extends ForProgramBlock
 			{
 				Data dat = ec.getVariable(var);
 				//skip non-existing input matrices (which are due to unknown sizes marked for
-				//partitioning but typically related branches are never executed)				
+				//partitioning but typically related branches are never executed)
 				if( dat != null && dat instanceof MatrixObject )
 				{
 					MatrixObject moVar = (MatrixObject) dat; //unpartitioned input
 					
 					PartitionFormat dpf = sb.determineDataPartitionFormat( var );
-					LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "+var+" (format="+dpf+", mode="+_dataPartitioner+")");
+					LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "
+						+ var + " (format="+dpf+", mode="+_dataPartitioner+")");
 					
 					if( dpf != PartitionFormat.NONE )
 					{
 						if( dataPartitioner != PDataPartitioner.REMOTE_SPARK && dpf.isBlockwise() ) {
 							LOG.warn("PARFOR ID = "+_ID+", Switching data partitioner from " + dataPartitioner + 
-									" to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
+								" to " + PDataPartitioner.REMOTE_SPARK.name()+" for blockwise-n partitioning.");
 							dataPartitioner = PDataPartitioner.REMOTE_SPARK;
 						}
 						
@@ -1214,9 +1188,8 @@ public class ParForProgramBlock extends ForProgramBlock
 						
 						//store original and partitioned matrix (for reuse if applicable)
 						_variablesDPOriginal.put(var, moVar);
-						if(    ALLOW_REUSE_PARTITION_VARS 
-							&& ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) 
-						{
+						if( ALLOW_REUSE_PARTITION_VARS 
+							&& ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) {
 							_variablesDPReuse.put(var, dpdatNew);
 						}
 						
@@ -1233,7 +1206,6 @@ public class ParForProgramBlock extends ForProgramBlock
 		if( OptimizerUtils.isSparkExecutionMode() &&
 			_variablesRP != null && !_variablesRP.isEmpty() ) {
 			SparkExecutionContext sec = (SparkExecutionContext) ec;
-			
 			for( String var : _variablesRP )
 				sec.repartitionAndCacheMatrixObject(var);
 		}
@@ -1245,7 +1217,6 @@ public class ParForProgramBlock extends ForProgramBlock
 		if( OptimizerUtils.isSparkExecutionMode() &&
 			_variablesECache != null && !_variablesECache.isEmpty() ) {
 			SparkExecutionContext sec = (SparkExecutionContext) ec;
-			
 			for( String var : _variablesECache )
 				sec.cacheMatrixObject(var);
 		}
@@ -1262,8 +1233,7 @@ public class ParForProgramBlock extends ForProgramBlock
 	private void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in) 
 		throws DMLRuntimeException
 	{
-		for( MatrixObject tmp : in )
-		{
+		for( MatrixObject tmp : in ) {
 			//check for empty inputs (no iterations executed)
 			if( tmp != null && tmp != out )
 				ec.cleanupMatrixObject(tmp);
@@ -1299,8 +1269,7 @@ public class ParForProgramBlock extends ForProgramBlock
 				switch( datatype )
 				{
 					case SCALAR:
-						switch( valuetype )
-						{
+						switch( valuetype ) {
 							case BOOLEAN: dataObj = new BooleanObject(var,false); break;
 							case INT:     dataObj = new IntObject(var,-1);        break;
 							case DOUBLE:  dataObj = new DoubleObject(var,-1d);    break;
@@ -1408,20 +1377,17 @@ public class ParForProgramBlock extends ForProgramBlock
 			HashSet<String> fnNames = new HashSet<String>();
 			if( USE_PB_CACHE )
 			{
-				if( _pbcache.containsKey(pwID) )
-				{
+				if( _pbcache.containsKey(pwID) ) {
 					cpChildBlocks = _pbcache.get(pwID);	
 				}
-				else
-				{
+				else {
 					cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false); 
 					_pbcache.put(pwID, cpChildBlocks);
 				}
 			}
-			else
-			{
+			else {
 				cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false); 
-			}             
+			}
 			
 			//deep copy execution context (including prepare parfor update-in-place)
 			ExecutionContext cpEc = ProgramConverter.createDeepCopyExecutionContext(ec);
@@ -1443,8 +1409,7 @@ public class ParForProgramBlock extends ForProgramBlock
 			pw = new LocalParWorker( pwID, queue, body, cconf, MAX_RETRYS_ON_ERROR, _monitor );
 			pw.setFunctionNames(fnNames);
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 		
@@ -1519,7 +1484,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		int maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks();
 		//correction max number of reducers on yarn clusters
 		if( InfrastructureAnalyzer.isYarnEnabled() )
-			maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 );				
+			maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 );
 		int numRed = Math.min(numReducers,maxNumRed);
 		
 		//create data partitioner
@@ -1530,12 +1495,12 @@ public class ParForProgramBlock extends ForProgramBlock
 				break;
 			case REMOTE_MR:
 				dp = new DataPartitionerRemoteMR( dpf, _ID, numRed,
-						_replicationDP, ALLOW_REUSE_MR_JVMS, false );
+					_replicationDP, ALLOW_REUSE_MR_JVMS, false );
 				break;
 			case REMOTE_SPARK:
 				dp = new DataPartitionerRemoteSpark( dpf, ec, numRed,
-						_replicationDP, false );
-				break;	
+					_replicationDP, false );
+				break;
 			default:
 				throw new DMLRuntimeException("Unknown data partitioner: '" +dataPartitioner.name()+"'.");
 		}
@@ -1557,18 +1522,18 @@ public class ParForProgramBlock extends ForProgramBlock
 		else {
 			int numReducers = ConfigurationManager.getNumReducers();
 			maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks();
-			maxRed = Math.min(numReducers, 
+			maxRed = Math.min(numReducers,
 				InfrastructureAnalyzer.getRemoteParallelReduceTasks());
 			//correction max number of reducers on yarn clusters
-			if( InfrastructureAnalyzer.isYarnEnabled() ) {					
-				maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() );	
-				maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 );	
+			if( InfrastructureAnalyzer.isYarnEnabled() ) {
+				maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() );
+				maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 );
 			}
 		}
 		int numMap = Math.max(_numThreads, maxMap);
 		int numRed = maxRed;
 		
-		//create result merge implementation		
+		//create result merge implementation
 		switch( prm )
 		{
 			case LOCAL_MEM:
@@ -1581,10 +1546,9 @@ public class ParForProgramBlock extends ForProgramBlock
 				rm = new ResultMergeLocalAutomatic( out, in, fname );
 				break;
 			case REMOTE_MR:
-				rm = new ResultMergeRemoteMR( out, in, fname, _ID, numMap, numRed,
-					                          WRITE_REPLICATION_FACTOR, 
-					                          MAX_RETRYS_ON_ERROR, 
-					                          ALLOW_REUSE_MR_JVMS );
+				rm = new ResultMergeRemoteMR( out, in, fname, 
+					_ID, numMap, numRed, WRITE_REPLICATION_FACTOR,
+					MAX_RETRYS_ON_ERROR, ALLOW_REUSE_MR_JVMS );
 				break;
 			case REMOTE_SPARK:
 				rm = new ResultMergeRemoteSpark( out, in, fname, ec, numMap, numRed );
@@ -1628,8 +1592,8 @@ public class ParForProgramBlock extends ForProgramBlock
 	private void releaseForcedRecompile(long tid) 
 		throws DMLRuntimeException
 	{
-		HashSet<String> fnStack = new HashSet<String>();
-		Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, tid, fnStack, null);
+		Recompiler.recompileProgramBlockHierarchy2Forced(
+			_childBlocks, tid, new HashSet<String>(), null);
 	}
 
 	private String writeTasksToFile(String fname, List<Task> tasks, int maxDigits)
@@ -1641,17 +1605,15 @@ public class ParForProgramBlock extends ForProgramBlock
 			Path path = new Path(fname);
 			FileSystem fs = IOUtilFunctions.getFileSystem(path);
 			br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-	        
+			
 			boolean flagFirst = true; //workaround for keeping gen order
-			for( Task t : tasks )
-			{
+			for( Task t : tasks ) {
 				br.write( createTaskFileLine( t, maxDigits, flagFirst ) );
 				if( flagFirst )
 					flagFirst = false;
 			}
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex);
 		}
 		finally {
@@ -1670,18 +1632,16 @@ public class ParForProgramBlock extends ForProgramBlock
 			Path path = new Path( fname );
 			FileSystem fs = IOUtilFunctions.getFileSystem(path);
 			br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-	        
+			
 			Task t = null;
 			boolean flagFirst = true; //workaround for keeping gen order
-			while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS )
-			{
+			while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS ) {
 				br.write( createTaskFileLine( t, maxDigits, flagFirst ) );
 				if( flagFirst )
 					flagFirst = false;
 			}
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex);
 		}
 		finally {
@@ -1691,11 +1651,9 @@ public class ParForProgramBlock extends ForProgramBlock
 		return fname;
 	}
 	
-	private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst ) 
-	{
+	private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst ) {
 		//always pad to max digits in order to preserve task order	
-		String ret = t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n";
-		return ret;
+		return t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n";
 	}
 
 	private void consolidateAndCheckResults(ExecutionContext ec, long expIters, long expTasks, long numIters, long numTasks, LocalVariableMap [] results) 
@@ -1752,20 +1710,20 @@ public class ParForProgramBlock extends ForProgramBlock
 					MatrixObject out = (MatrixObject) dat;
 					MatrixObject[] in = new MatrixObject[ results.length ];
 					for( int i=0; i< results.length; i++ )
-						in[i] = (MatrixObject) results[i].get( var ); 			
+						in[i] = (MatrixObject) results[i].get( var );
 					String fname = constructResultMergeFileName();
 					ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, ec);
 					MatrixObject outNew = null;
 					if( USE_PARALLEL_RESULT_MERGE )
 						outNew = rm.executeParallelMerge( _numThreads );
 					else
-						outNew = rm.executeSerialMerge(); 		
+						outNew = rm.executeSerialMerge();
 					
 					//cleanup existing var
 					Data exdata = ec.removeVariable(var);
 					if( exdata != null && exdata != outNew && exdata instanceof MatrixObject )
 						ec.cleanupMatrixObject((MatrixObject)exdata);
-							
+					
 					//cleanup of intermediate result variables
 					cleanWorkerResultVariables( ec, out, in );
 					
@@ -1796,21 +1754,18 @@ public class ParForProgramBlock extends ForProgramBlock
 	 * 
 	 * @return true if ?
 	 */
-	private boolean checkParallelRemoteResultMerge()
-	{
-		return (USE_PARALLEL_RESULT_MERGE_REMOTE 
-			    && _resultVars.size() > 1
-			    && ( _resultMerge == PResultMerge.REMOTE_MR
-			       ||_resultMerge == PResultMerge.REMOTE_SPARK) );
+	private boolean checkParallelRemoteResultMerge() {
+		return (USE_PARALLEL_RESULT_MERGE_REMOTE && _resultVars.size() > 1
+			&& ( _resultMerge == PResultMerge.REMOTE_MR 
+				||_resultMerge == PResultMerge.REMOTE_SPARK) );
 	}
 
-	private void setParForProgramBlockIDs(int IDPrefix)
-	{
+	private void setParForProgramBlockIDs(int IDPrefix) {
 		_IDPrefix = IDPrefix;
 		if( _IDPrefix == -1 ) //not specified
 			_ID = _pfIDSeq.getNextID(); //generated new ID
 		else //remote case (further nested parfors are all in one JVM)
-		    _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID());	
+			_ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID());	
 	}
 	
 	/**
@@ -1826,8 +1781,7 @@ public class ParForProgramBlock extends ForProgramBlock
 			
 		_pwIDs = new long[ _numThreads ];
 		
-		for( int i=0; i<_numThreads; i++ )
-		{
+		for( int i=0; i<_numThreads; i++ ) {
 			if(_IDPrefix == -1)
 				_pwIDs[i] = _pwIDSeq.getNextID();
 			else
@@ -1838,8 +1792,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		}
 	}
 
-	private long computeNumIterations( IntObject from, IntObject to, IntObject incr )
-	{
+	private long computeNumIterations( IntObject from, IntObject to, IntObject incr ) {
 		return (long)Math.ceil(((double)(to.getLongValue() - from.getLongValue() + 1)) / incr.getLongValue()); 
 	}
 	
@@ -1849,10 +1802,8 @@ public class ParForProgramBlock extends ForProgramBlock
  	 * 
 	 * @return task file name
 	 */
-	private String constructTaskFileName()
-	{
+	private String constructTaskFileName() {
 		String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-	
 		StringBuilder sb = new StringBuilder();
 		sb.append(scratchSpaceLoc);
 		sb.append(Lop.FILE_SEPARATOR);
@@ -1869,10 +1820,8 @@ public class ParForProgramBlock extends ForProgramBlock
 	 * 
 	 * @return result file name
 	 */
-	private String constructResultFileName()
-	{
+	private String constructResultFileName() {
 		String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-		
 		StringBuilder sb = new StringBuilder();
 		sb.append(scratchSpaceLoc);
 		sb.append(Lop.FILE_SEPARATOR);
@@ -1880,13 +1829,11 @@ public class ParForProgramBlock extends ForProgramBlock
 		sb.append(DMLScript.getUUID());
 		sb.append(PARFOR_MR_RESULT_TMP_FNAME.replaceAll("%ID%", String.valueOf(_ID)));
 		
-		return sb.toString();   
+		return sb.toString();
 	}
 
-	private String constructResultMergeFileName()
-	{
+	private String constructResultMergeFileName() {
 		String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-		
 		String fname = PARFOR_MR_RESULTMERGE_FNAME;
 		fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID
 		fname = fname.replaceAll("%VAR%", String.valueOf(_resultVarsIDSeq.getNextID()));
@@ -1898,13 +1845,11 @@ public class ParForProgramBlock extends ForProgramBlock
 		sb.append(DMLScript.getUUID());
 		sb.append(fname);
 		
-		return sb.toString();   		
+		return sb.toString();
 	}
 
-	private String constructDataPartitionsFileName()
-	{
+	private String constructDataPartitionsFileName() {
 		String scratchSpaceLoc = ConfigurationManager.getScratchSpace();
-		
 		String fname = PARFOR_DATAPARTITIONS_FNAME;
 		fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID
 		fname = fname.replaceAll("%VAR%", String.valueOf(_dpVarsIDSeq.getNextID()));
@@ -1916,7 +1861,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		sb.append(DMLScript.getUUID());
 		sb.append(fname);
 		
-		return sb.toString();   		
+		return sb.toString();
 	}
 
 	private long getMinMemory(ExecutionContext ec)
@@ -1946,10 +1891,8 @@ public class ParForProgramBlock extends ForProgramBlock
 		return ret;
 	}
 
-	private void setMemoryBudget()
-	{
-		if( _recompileMemoryBudget > 0 )
-		{
+	private void setMemoryBudget() {
+		if( _recompileMemoryBudget > 0 ) {
 			// store old budget for reset after exec
 			_oldMemoryBudget = (double)InfrastructureAnalyzer.getLocalMaxMemory();
 			
@@ -1959,16 +1902,12 @@ public class ParForProgramBlock extends ForProgramBlock
 		}
 	}
 	
-	private void resetMemoryBudget()
-	{
+	private void resetMemoryBudget() {
 		if( _recompileMemoryBudget > 0 )
-		{
 			InfrastructureAnalyzer.setLocalMaxMemory((long)_oldMemoryBudget);
-		}
 	}
 	
-	private void resetOptimizerFlags()
-	{
+	private void resetOptimizerFlags() {
 		//reset all state that was set but is not guaranteed to be overwritten by optimizer
 		_variablesDPOriginal.removeAll();
 		_colocatedDPMatrix         = null;
@@ -2021,7 +1960,7 @@ public class ParForProgramBlock extends ForProgramBlock
 					
 					MatrixObject[] in = new MatrixObject[ _refVars.length ];
 					for( int i=0; i< _refVars.length; i++ )
-						in[i] = (MatrixObject) _refVars[i].get( varname ); 			
+						in[i] = (MatrixObject) _refVars[i].get( varname ); 
 					String fname = constructResultMergeFileName();
 				
 					ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, _ec);
@@ -2029,7 +1968,7 @@ public class ParForProgramBlock extends ForProgramBlock
 					if( USE_PARALLEL_RESULT_MERGE )
 						outNew = rm.executeParallelMerge( _numThreads );
 					else
-						outNew = rm.executeSerialMerge(); 	
+						outNew = rm.executeSerialMerge();
 					
 					synchronized( _ec.getVariables() ){
 						_ec.getVariables().put( varname, outNew);
@@ -2051,5 +1990,4 @@ public class ParForProgramBlock extends ForProgramBlock
 	public String printBlockErrorLocation(){
 		return "ERROR: Runtime error in parfor program block generated from parfor statement block between lines " + _beginLine + " and " + _endLine + " -- ";
 	}
-	
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
index 0bd73d1..59d2589 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/ExecutionContext.java
@@ -544,23 +544,18 @@ public class ExecutionContext {
 		for( String var : varList )
 		{
 			Data dat = _variables.get(var);
-			if( dat instanceof MatrixObject )
-			{
+			if( dat instanceof MatrixObject ) {
 				MatrixObject mo = (MatrixObject)dat;
 				varsState.put( var, mo.isCleanupEnabled() );
-				//System.out.println("pre-pin "+var+" ("+mo.isCleanupEnabled()+")");
 			}
 		}
 		
 		//step 2) pin variables
-		for( String var : varList )
-		{
+		for( String var : varList ) {
 			Data dat = _variables.get(var);
-			if( dat instanceof MatrixObject )
-			{
+			if( dat instanceof MatrixObject ) {
 				MatrixObject mo = (MatrixObject)dat;
 				mo.enableCleanup(false); 
-				//System.out.println("pin "+var);
 			}
 		}
 		
@@ -583,11 +578,8 @@ public class ExecutionContext {
 	 * @param varList variable list
 	 * @param varsState variable state
 	 */
-	public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState)
-	{
-		for( String var : varList)
-		{
-			//System.out.println("unpin "+var+" ("+varsState.get(var)+")");
+	public void unpinVariables(ArrayList<String> varList, HashMap<String,Boolean> varsState) {
+		for( String var : varList) {
 			Data dat = _variables.get(var);
 			if( dat instanceof MatrixObject )
 				((MatrixObject)dat).enableCleanup(varsState.get(var));
@@ -597,15 +589,28 @@ public class ExecutionContext {
 	/**
 	 * NOTE: No order guaranteed, so keep same list for pin and unpin. 
 	 * 
-	 * @return variable list as strings
+	 * @return list of all variable names.
 	 */
-	public ArrayList<String> getVarList()
-	{
-		ArrayList<String> varlist = new ArrayList<String>();
-		varlist.addAll(_variables.keySet());	
-		return varlist;
+	public ArrayList<String> getVarList() {
+		return new ArrayList<>(_variables.keySet());
 	}
-
+	
+	/**
+	 * NOTE: No order guaranteed, so keep same list for pin and unpin. 
+	 * 
+	 * @return list of all variable names of partitioned matrices.
+	 */
+	public ArrayList<String> getVarListPartitioned() {
+		ArrayList<String> ret = new ArrayList<>();
+		for( String var : _variables.keySet() ) {
+			Data dat = _variables.get(var);
+			if( dat instanceof MatrixObject 
+				&& ((MatrixObject)dat).isPartitioned() )
+				ret.add(var);
+		}
+		return ret;
+	}
+	
 	public void cleanupMatrixObject(MatrixObject mo)
 		throws DMLRuntimeException 
 	{

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
new file mode 100644
index 0000000..48ef883
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/CachedReuseVariables.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.parfor;
+
+import java.lang.ref.SoftReference;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
+
+public class CachedReuseVariables
+{
+	private final HashMap<Long, SoftReference<LocalVariableMap>> _data;
+	
+	public CachedReuseVariables() {
+		_data = new HashMap<>();
+	}
+	
+	public synchronized void reuseVariables(long pfid, LocalVariableMap vars, Collection<String> blacklist) {
+		//check for existing reuse map
+		LocalVariableMap tmp = null;
+		if( _data.containsKey(pfid) )
+			tmp = _data.get(pfid).get();
+		
+		//build reuse map if not created yet or evicted
+		if( tmp == null ) {
+			tmp = new LocalVariableMap(vars);
+			tmp.removeAllIn(new HashSet<>(blacklist));
+			_data.put(pfid, new SoftReference<>(tmp));
+		}
+		//reuse existing reuse map
+		else {
+			for( String varName : tmp.keySet() )
+				vars.put(varName, tmp.get(varName));
+		}
+	}
+
+	public synchronized void clearVariables(long pfid) {
+		_data.remove(pfid);
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
index 8dc86d4..6d0cd3a 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ProgramConverter.java
@@ -328,16 +328,15 @@ public class ProgramConverter
 		ParForProgramBlock tmpPB = null;
 		
 		if( IDPrefix == -1 ) //still on master node
-			tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams());
+			tmpPB = new ParForProgramBlock(prog,pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
 		else //child of remote ParWorker at any level
-			tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams());
+			tmpPB = new ParForProgramBlock(IDPrefix, prog, pfpb.getIterVar(), pfpb.getParForParams(), pfpb.getResultVariables());
 		
 		tmpPB.setStatementBlock( createForStatementBlockCopy( (ForStatementBlock) pfpb.getStatementBlock(), pid, plain, forceDeepCopy) );
 		tmpPB.setThreadID(pid);
 		
 		tmpPB.disableOptimization(); //already done in top-level parfor
 		tmpPB.disableMonitorReport(); //already done in top-level parfor
-		tmpPB.setResultVariables( pfpb.getResultVariables() );
 		
 		tmpPB.setFromInstructions( createDeepCopyInstructionSet(pfpb.getFromInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
 		tmpPB.setToInstructions( createDeepCopyInstructionSet(pfpb.getToInstructions(), pid, IDPrefix, prog, fnStack, fnCreated, plain, true) );
@@ -1514,9 +1513,8 @@ public class ProgramConverter
 		//program blocks //reset id to preinit state, replaced during exec
 		ArrayList<ProgramBlock> pbs = rParseProgramBlocks(st.nextToken(), prog, 0); 
 		
-		ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params);
+		ParForProgramBlock pfpb = new ParForProgramBlock(id, prog, iterVar, params, resultVars);
 		pfpb.disableOptimization(); //already done in top-level parfor
-		pfpb.setResultVariables(resultVars);		
 		pfpb.setFromInstructions(from);
 		pfpb.setToInstructions(to);
 		pfpb.setIncrementInstructions(incr);

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
index 49ac9db..10b44a2 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSpark.java
@@ -34,6 +34,8 @@ import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.LocalVariableMap;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysml.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.controlprogram.parfor.util.IDSequence;
 import org.apache.sysml.utils.Statistics;
 
 /**
@@ -51,12 +53,14 @@ import org.apache.sysml.utils.Statistics;
  */
 public class RemoteParForSpark 
 {
-	
 	protected static final Log LOG = LogFactory.getLog(RemoteParForSpark.class.getName());
-
-	public static RemoteParForJobReturn runJob(long pfid, String program, HashMap<String, byte[]> clsMap, 
+	
+	//globally unique id for parfor spark job instances (unique across spark contexts)
+	private static final IDSequence _jobID = new IDSequence();
+	
+	public static RemoteParForJobReturn runJob(long pfid, String prog, HashMap<String, byte[]> clsMap, 
 			List<Task> tasks, ExecutionContext ec, boolean cpCaching, int numMappers) 
-		throws DMLRuntimeException  
+		throws DMLRuntimeException
 	{
 		String jobname = "ParFor-ESP";
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
@@ -68,13 +72,16 @@ public class RemoteParForSpark
 		LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
 		LongAccumulator aIters = sc.sc().longAccumulator("iterations");
 		
+		//reset cached shared inputs for correctness in local mode
+		long jobid = _jobID.getNextID();
+		if( InfrastructureAnalyzer.isLocalMode() )
+			RemoteParForSparkWorker.cleanupCachedVariables(jobid);
+		
 		//run remote_spark parfor job 
 		//(w/o lazy evaluation to fit existing parfor framework, e.g., result merge)
-		RemoteParForSparkWorker func = new RemoteParForSparkWorker(program, clsMap, cpCaching, aTasks, aIters);
-		List<Tuple2<Long,String>> out = sc
-				.parallelize(tasks, tasks.size()) //create rdd of parfor tasks
-				.flatMapToPair(func)              //execute parfor tasks 
-				.collect();                       //get output handles
+		List<Tuple2<Long,String>> out = sc.parallelize(tasks, tasks.size()) //create rdd of parfor tasks
+			.flatMapToPair(new RemoteParForSparkWorker(jobid, prog, clsMap, cpCaching, aTasks, aIters))
+			.collect(); //execute and get output handles
 		
 		//de-serialize results
 		LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
@@ -85,11 +92,10 @@ public class RemoteParForSpark
 		RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
 		
 		//maintain statistics
-	    Statistics.incrementNoOfCompiledSPInst();
-	    Statistics.incrementNoOfExecutedSPInst();
-	    if( DMLScript.STATISTICS ){
+		Statistics.incrementNoOfCompiledSPInst();
+		Statistics.incrementNoOfExecutedSPInst();
+		if( DMLScript.STATISTICS )
 			Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
-		}
 		
 		return ret;
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
index e1410da..033d398 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/RemoteParForSparkWorker.java
@@ -21,10 +21,12 @@ package org.apache.sysml.runtime.controlprogram.parfor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.util.LongAccumulator;
@@ -40,8 +42,11 @@ import scala.Tuple2;
 public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFunction<Task, Long, String> 
 {
 	private static final long serialVersionUID = -3254950138084272296L;
-
-	private final String  _prog;
+	
+	private static final CachedReuseVariables reuseVars = new CachedReuseVariables();
+	
+	private final long _jobid;
+	private final String _prog;
 	private final HashMap<String, byte[]> _clsMap;
 	private boolean _initialized = false;
 	private boolean _caching = true;
@@ -49,9 +54,10 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	private final LongAccumulator _aTasks;
 	private final LongAccumulator _aIters;
 	
-	public RemoteParForSparkWorker(String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
+	public RemoteParForSparkWorker(long jobid, String program, HashMap<String, byte[]> clsMap, boolean cpCaching, LongAccumulator atasks, LongAccumulator aiters) 
 		throws DMLRuntimeException
 	{
+		_jobid = jobid;
 		_prog = program;
 		_clsMap = clsMap;
 		_initialized = false;
@@ -68,7 +74,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 	{
 		//lazy parworker initialization
 		if( !_initialized )
-			configureWorker( TaskContext.get().taskAttemptId() );
+			configureWorker(TaskContext.get().taskAttemptId());
 		
 		//execute a single task
 		long numIter = getExecutedIterations();
@@ -88,10 +94,11 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		return ret.iterator();
 	}
 
-	private void configureWorker( long ID ) 
+	@SuppressWarnings("unchecked")
+	private void configureWorker(long taskID) 
 		throws DMLRuntimeException, IOException
 	{
-		_workerID = ID;
+		_workerID = taskID;
 		
 		//initialize codegen class cache (before program parsing)
 		synchronized( CodegenUtils.class ) {
@@ -106,7 +113,13 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		_resultVars  = body.getResultVarNames();
 		_numTasks    = 0;
 		_numIters    = 0;
-
+		
+		//reuse shared inputs (to read shared inputs once per process instead of once per core; 
+		//we reuse everything except result variables and partitioned input matrices)
+		_ec.pinVariables(_ec.getVarList()); //avoid cleanup of shared inputs
+		Collection<String> blacklist = CollectionUtils.union(_resultVars, _ec.getVarListPartitioned());
+		reuseVars.reuseVariables(_jobid, _ec.getVariables(), blacklist);
+		
 		//init and register-cleanup of buffer pool (in parfor spark, multiple tasks might 
 		//share the process-local, i.e., per executor, buffer pool; hence we synchronize 
 		//the initialization and immediately register the created directory for cleanup
@@ -121,7 +134,7 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 						CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
 				//register entire working dir for delete on shutdown
 				RemoteParForUtils.cleanupWorkingDirectoriesOnShutdown();
-			}	
+			}
 		}
 		
 		//ensure that resultvar files are not removed
@@ -134,4 +147,8 @@ public class RemoteParForSparkWorker extends ParWorker implements PairFlatMapFun
 		//mark as initialized
 		_initialized = true;
 	}
+	
+	public static void cleanupCachedVariables(long pfid) {
+		reuseVars.clearVariables(pfid);
+	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/2c57cf77/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
index d202e07..7dd25bf 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
@@ -57,14 +57,14 @@ public class TaskPartitionerFactoring extends TaskPartitioner
 	{
 		LinkedList<Task> tasks = new LinkedList<Task>();
 		
-		long lFrom  = _fromVal.getLongValue();
-		long lTo    = _toVal.getLongValue();
-		long lIncr  = _incrVal.getLongValue();
+		long lFrom = _fromVal.getLongValue();
+		long lTo = _toVal.getLongValue();
+		long lIncr = _incrVal.getLongValue();
 		
 		int P = _numThreads;  // number of parallel workers
-		long N = _numIter;     // total number of iterations
-		long R = N;            // remaining number of iterations
-		long K = -1;           // next _numThreads task sizes	
+		long N = _numIter;    // total number of iterations
+		long R = N;           // remaining number of iterations
+		long K = -1;          // next _numThreads task sizes
 		TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
 		
 		for( long i = lFrom; i<=lTo;  )
@@ -73,7 +73,7 @@ public class TaskPartitionerFactoring extends TaskPartitioner
 			R -= (K * P);
 			
 			type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ? 
-					   TaskType.RANGE : TaskType.SET;
+				TaskType.RANGE : TaskType.SET;
 			
 			//for each logical processor
 			for( int j=0; j<P; j++ )
@@ -86,16 +86,12 @@ public class TaskPartitionerFactoring extends TaskPartitioner
 				tasks.addLast(lTask);
 				
 				// add iterations to task 
-				if( type == TaskType.SET ) 
-				{
+				if( type == TaskType.SET ) {
 					//value based tasks
 					for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
-					{
-						lTask.addIteration(new IntObject(_iterVarName, i));				
-					}				
+						lTask.addIteration(new IntObject(_iterVarName, i));
 				}
-				else 
-				{
+				else {
 					//determine end of task
 					long to = Math.min( i+(K-1)*lIncr, lTo );
 					
@@ -103,7 +99,6 @@ public class TaskPartitionerFactoring extends TaskPartitioner
 					lTask.addIteration(new IntObject(_iterVarName, i));	    //from
 					lTask.addIteration(new IntObject(_iterVarName, to));    //to
 					lTask.addIteration(new IntObject(_iterVarName, lIncr));	//increment
-					
 					i = to + lIncr;
 				}
 			}
@@ -122,11 +117,11 @@ public class TaskPartitionerFactoring extends TaskPartitioner
 		long lTo    = _toVal.getLongValue();
 		long lIncr  = _incrVal.getLongValue();
 		
-		int P = _numThreads;     // number of parallel workers
+		int P = _numThreads;   // number of parallel workers
 		long N = _numIter;     // total number of iterations
-		long R = N;               // remaining number of iterations
-		long K = -1;              //next _numThreads task sizes	
-	    TaskType type = null;    // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
+		long R = N;            // remaining number of iterations
+		long K = -1;           //next _numThreads task sizes	
+	    TaskType type = null;  // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
 		
 		try
 		{


[2/2] systemml git commit: [SYSTEMML-1881] Tuning parfor degree of parallelism (over-provisioning)

Posted by mb...@apache.org.
[SYSTEMML-1881] Tuning parfor degree of parallelism (over-provisioning)

This patch addresses issues of under-utilized CPU resources in parfor
contexts. For example, on Kmeans or MSVM with few runs or classes that
are not a factor of the number of hardware threads, we assign the
remaining parallelism too conservatively. Consider Kmeans with 10 runs
and 16 hardware threads - in this case, we assign k=10 to parfor and k=1
to the operations in the parfor body. This patch fine-tunes this
assignment by slightly over-provisioning CPU resources, which is usually
a good idea due to barriers between operations. We now assign the
remaining operation parallelism with k=round(maxK/parforK). 

On the perftest Kmeans 100K x 1K scenario with 50 classes, 10 runs, and
16 hardware threads, this patch improved performance from 196s to 168s


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/ba73291c
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/ba73291c
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/ba73291c

Branch: refs/heads/master
Commit: ba73291c985d876eaeeb5719623461131bcc7f66
Parents: 2c57cf7
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Sep 2 14:30:11 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Sep 2 14:30:11 2017 -0700

----------------------------------------------------------------------
 .../parfor/opt/OptimizerRuleBased.java          | 34 +++++++++++++-------
 1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/ba73291c/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index b8da25a..9dada01 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -1247,12 +1247,12 @@ public class OptimizerRuleBased extends Optimizer
 			
 			//constrain max parfor parallelism by problem size
 			int parforK = (int)((_N<kMax)? _N : kMax);
-
-
+			
 			// if gpu mode is enabled, the amount of parallelism is set to
 			// the smaller of the number of iterations and the number of GPUs
 			// otherwise it default to the number of CPU cores and the
 			// operations are run in CP mode
+			//FIXME rework for nested parfor parallelism and body w/o gpu ops
 			if (DMLScript.USE_ACCELERATOR) {
 				long perGPUBudget = GPUContextPool.initialGPUMemBudget();
 				double maxMemUsage = getMaxCPOnlyBudget(n);
@@ -1264,15 +1264,14 @@ public class OptimizerRuleBased extends Optimizer
 							parforK + "]");
 				}
 			}
-
 			
 			//set parfor degree of parallelism
 			pfpb.setDegreeOfParallelism(parforK);
-			n.setK(parforK);	
+			n.setK(parforK);
 			
 			//distribute remaining parallelism 
-			int remainParforK = (int)Math.ceil(((double)(kMax-parforK+1))/parforK);
-			int remainOpsK = Math.max(_lkmaxCP / parforK, 1);
+			int remainParforK = getRemainingParallelismParFor(kMax, parforK);
+			int remainOpsK = getRemainingParallelismOps(_lkmaxCP, parforK);
 			rAssignRemainingParallelism( n, remainParforK, remainOpsK ); 
 		}
 		else // ExecType.MR/ExecType.SPARK
@@ -1334,13 +1333,13 @@ public class OptimizerRuleBased extends Optimizer
 					//set parfor degree of parallelism
 					long id = c.getID();
 					c.setK(tmpK);
-					ParForProgramBlock pfpb = (ParForProgramBlock) OptTreeConverter
-                                                  .getAbstractPlanMapping().getMappedProg(id)[1];
+					ParForProgramBlock pfpb = (ParForProgramBlock) 
+						OptTreeConverter.getAbstractPlanMapping().getMappedProg(id)[1];
 					pfpb.setDegreeOfParallelism(tmpK);
 					
-					//distribute remaining parallelism 
-					int remainParforK = (int)Math.ceil(((double)(parforK-tmpK+1))/tmpK);
-					int remainOpsK = Math.max(opsK / tmpK, 1);
+					//distribute remaining parallelism
+					int remainParforK = getRemainingParallelismParFor(parforK, tmpK);
+					int remainOpsK = getRemainingParallelismOps(opsK, tmpK);
 					rAssignRemainingParallelism(c, remainParforK, remainOpsK);
 				}
 				else if( c.getNodeType() == NodeType.HOP )
@@ -1387,7 +1386,18 @@ public class OptimizerRuleBased extends Optimizer
 			}
 		}
 	}
-
+	
+	private static int getRemainingParallelismParFor(int parforK, int tmpK) {
+		//compute max remaining parfor parallelism k such that k * tmpK <= parforK
+		return (int)Math.ceil((double)(parforK-tmpK+1) / tmpK);
+	}
+	
+	private static int getRemainingParallelismOps(int opsK, int tmpK) {
+		//compute max remaining operations parallelism k with slight over-provisioning 
+		//such that k * tmpK <= 1.5 * opsK; note that if parfor already exploits the
+		//maximum parallelism, this will not introduce any over-provisioning.
+		return (int)Math.max(Math.round((double)opsK / tmpK), 1);
+	}
 	
 	///////
 	//REWRITE set task partitioner