You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:41 UTC

[10/78] [abbrv] [partial] incubator-systemml git commit: Move files to new package folder structure

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ParForProgramBlock.java
deleted file mode 100644
index faf85c9..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/ParForProgramBlock.java
+++ /dev/null
@@ -1,2126 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Level;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.DMLScript.RUNTIME_PLATFORM;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.conf.DMLConfig;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.hops.recompile.Recompiler;
-import com.ibm.bi.dml.lops.Lop;
-import com.ibm.bi.dml.lops.LopProperties.ExecType;
-import com.ibm.bi.dml.parser.DMLProgram;
-import com.ibm.bi.dml.parser.DataIdentifier;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.parser.ParForStatementBlock;
-import com.ibm.bi.dml.parser.StatementBlock;
-import com.ibm.bi.dml.parser.VariableSet;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
-import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.DataPartitioner;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.DataPartitionerLocal;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.DataPartitionerRemoteMR;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.DataPartitionerRemoteSpark;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.LocalParWorker;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.LocalTaskQueue;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ParForBody;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ProgramConverter;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.RemoteDPParForMR;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.RemoteDPParForSpark;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.RemoteParForJobReturn;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.RemoteParForMR;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.RemoteParForSpark;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ResultMerge;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ResultMergeLocalAutomatic;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ResultMergeLocalFile;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ResultMergeLocalMemory;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ResultMergeRemoteMR;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.ResultMergeRemoteSpark;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.Task;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitioner;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitionerFactoring;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitionerFactoringCmax;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitionerFactoringCmin;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitionerFixedsize;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitionerNaive;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.TaskPartitionerStatic;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.mqo.RuntimePiggybacking;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.CostEstimator;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.CostEstimatorHops;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.OptTree;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.OptTreeConverter;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.OptimizationWrapper;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.OptimizerRuleBased;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.PerfTestTool.TestMeasure;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.ProgramRecompiler;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.Stat;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.StatisticMonitor;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.Timing;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDHandler;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.instructions.cp.BooleanObject;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.instructions.cp.DoubleObject;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-import com.ibm.bi.dml.runtime.instructions.cp.StringObject;
-import com.ibm.bi.dml.runtime.instructions.cp.VariableCPInstruction;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.utils.Statistics;
-import com.ibm.bi.dml.yarn.ropt.YarnClusterAnalyzer;
-
-
-
-/**
- * The ParForProgramBlock has the same execution semantics as a ForProgamBlock but executes
- * the independent iterations in parallel. See ParForStatementBlock for the loop dependency
- * analysis. At runtime level, iterations are guaranteed to be completely independent.
- * 
- * NEW FUNCTIONALITIES (not for BI 2.0 release)
- * TODO: reduction variables (operations: +=, -=, /=, *=, min, max)
- * TODO: papply(A,1:2,FUN) language construct (compiled to ParFOR) via DML function repository => modules OK, but second-order functions required
- *
- */
-public class ParForProgramBlock extends ForProgramBlock 
-{	
-	// execution modes
-	public enum PExecMode {
-		LOCAL,      //local (master) multi-core execution mode
-		REMOTE_MR,	//remote (MR cluster) execution mode
-		REMOTE_MR_DP,	//remote (MR cluster) execution mode, fused with data partitioning
-		REMOTE_SPARK,	//remote (Spark cluster) execution mode
-		REMOTE_SPARK_DP,//remote (Spark cluster) execution mode, fused with data partitioning
-		UNSPECIFIED
-	}
-
-	// task partitioner
-	public enum PTaskPartitioner {
-		FIXED,      //fixed-sized task partitioner, uses tasksize 
-		NAIVE,      //naive task partitioner (tasksize=1)
-		STATIC,     //static task partitioner (numIterations/numThreads)
-		FACTORING,  //factoring task partitioner  
-		FACTORING_CMIN,  //constrained factoring task partitioner, uses tasksize as min constraint
-		FACTORING_CMAX,  //constrained factoring task partitioner, uses tasksize as max constraint
-		UNSPECIFIED
-	}
-	
-	public enum PDataPartitionFormat {
-		NONE,
-		ROW_WISE,
-		ROW_BLOCK_WISE,
-		ROW_BLOCK_WISE_N,
-		COLUMN_WISE,
-		COLUMN_BLOCK_WISE,
-		COLUMN_BLOCK_WISE_N,
-		BLOCK_WISE_M_N,
-		UNSPECIFIED;
-
-		/**
-		 * Note: Robust version of valueOf in order to return NONE without exception
-		 * if misspelled or non-existing and for case-insensitivity.
-		 * 
-		 * @param s
-		 * @return
-		 */
-		public static PDataPartitionFormat parsePDataPartitionFormat(String s) {
-			if (s.equalsIgnoreCase("ROW_WISE"))
-				return ROW_WISE;
-			else if (s.equalsIgnoreCase("ROW_BLOCK_WISE"))
-				return ROW_BLOCK_WISE;
-			else if (s.equalsIgnoreCase("ROW_BLOCK_WISE_N"))
-				return ROW_BLOCK_WISE_N;
-			else if (s.equalsIgnoreCase("COLUMN_WISE"))
-				return COLUMN_WISE;
-			else if (s.equalsIgnoreCase("COLUMN_BLOCK_WISE"))
-				return COLUMN_BLOCK_WISE;
-			else if (s.equalsIgnoreCase("COLUMN_BLOCK_WISE_N"))
-				return COLUMN_BLOCK_WISE_N;
-			else if (s.equalsIgnoreCase("BLOCK_WISE_M_N"))
-				return BLOCK_WISE_M_N;
-			else
-				return NONE;
-		}
-	}
-	
-	public enum PDataPartitioner {
-		NONE,       // no data partitioning
-		LOCAL,      // local file based partition split on master node
-		REMOTE_MR,  // remote partition split using a reblock MR job 
-		REMOTE_SPARK, // remote partition split using a spark job
-		UNSPECIFIED, 
-  	}
-
-	public enum PResultMerge {
-		LOCAL_MEM,       // in-core (in-memory) result merge (output and one input at a time)
-		LOCAL_FILE,      // out-of-core result merge (file format dependent)
-		LOCAL_AUTOMATIC, // decides between MEM and FILE based on the size of the output matrix 
-		REMOTE_MR,       // remote MR parallel result merge
-		REMOTE_SPARK,    // remote Spark parallel result merge
-		UNSPECIFIED,
-	}
-	
-	//optimizer
-	public enum POptMode{
-		NONE,       //no optimization, use defaults and specified parameters
-		RULEBASED, //some simple rule-based rewritings (affects only parfor PB) - similar to HEURISTIC but no exec time estimates
-		CONSTRAINED, //same as rule-based but with given params as constraints
-		HEURISTIC, //some simple cost-based rewritings (affects only parfor PB)
-		GREEDY,     //greedy cost-based optimization algorithm (potentially local optimum, affects all instructions)
-		FULL_DP,    //full cost-based optimization algorithm (global optimum, affects all instructions)				
-	}
-		
-	// internal parameters
-	public static final boolean OPTIMIZE                    = true;	// run all automatic optimizations on top-level parfor
-	public static final boolean USE_PB_CACHE                = false; // reuse copied program blocks whenever possible, not there can be issues related to recompile
-	public static       boolean USE_RANGE_TASKS_IF_USEFUL   = true;   	// use range tasks whenever size>3, false, otherwise wrong split order in remote 
-	public static final boolean USE_STREAMING_TASK_CREATION = true;  	// start working while still creating tasks, prevents blocking due to too small task queue
-	public static final boolean ALLOW_NESTED_PARALLELISM	= true;    // if not, transparently change parfor to for on program conversions (local,remote)
-	public static       boolean ALLOW_REUSE_MR_JVMS         = true;    // potential benefits: less setup costs per task, NOTE> cannot be used MR4490 in Hadoop 1.0.3, still not fixed in 1.1.1
-	public static       boolean ALLOW_REUSE_MR_PAR_WORKER   = ALLOW_REUSE_MR_JVMS; //potential benefits: less initialization, reuse in-memory objects and result consolidation!
-	public static final boolean USE_FLEX_SCHEDULER_CONF     = false;
-	public static final boolean USE_PARALLEL_RESULT_MERGE   = false;    // if result merge is run in parallel or serial 
-	public static final boolean USE_PARALLEL_RESULT_MERGE_REMOTE = true; // if remote result merge should be run in parallel for multiple result vars
-	public static final boolean ALLOW_DATA_COLOCATION       = true;
-	public static final boolean CREATE_UNSCOPED_RESULTVARS  = true;
-	public static       boolean ALLOW_REUSE_PARTITION_VARS  = true; //reuse partition input matrices, applied only if read-only in surrounding loops
-	public static final int     WRITE_REPLICATION_FACTOR    = 1;
-	public static final int     MAX_RETRYS_ON_ERROR         = 1;
-	public static final boolean FORCE_CP_ON_REMOTE_MR       = true; // compile body to CP if exec type forced to MR
-	public static final boolean LIVEVAR_AWARE_EXPORT        = true; //export only read variables according to live variable analysis
- 	public static final boolean LIVEVAR_AWARE_CLEANUP       = true; //cleanup pinned variables according to live variable analysis
-	public static final boolean RESET_RECOMPILATION_FLAGs   = true;
- 	
- 	public static final String PARFOR_FNAME_PREFIX          = "/parfor/"; 
-	public static final String PARFOR_MR_TASKS_TMP_FNAME    = PARFOR_FNAME_PREFIX + "%ID%_MR_taskfile"; 
-	public static final String PARFOR_MR_RESULT_TMP_FNAME   = PARFOR_FNAME_PREFIX + "%ID%_MR_results"; 
-	public static final String PARFOR_MR_RESULTMERGE_FNAME  = PARFOR_FNAME_PREFIX + "%ID%_resultmerge%VAR%"; 
-	public static final String PARFOR_DATAPARTITIONS_FNAME  = PARFOR_FNAME_PREFIX + "%ID%_datapartitions%VAR%"; 
-	
-	public static final String PARFOR_COUNTER_GROUP_NAME    = "SystemML ParFOR Counters";
-	
-	// static ID generator sequences
-	private static IDSequence   _pfIDSeq        = null;
-	private static IDSequence   _pwIDSeq        = null;
-	
-	// runtime parameters
-	protected HashMap<String,String> _params    = null;
-	protected int              _numThreads      = -1;
-	protected PTaskPartitioner _taskPartitioner = null; 
-	protected long             _taskSize        = -1;
-	protected PDataPartitioner _dataPartitioner = null;
-	protected PResultMerge     _resultMerge     = null;
-	protected PExecMode        _execMode        = null;
-	protected POptMode         _optMode         = null;
-	protected boolean          _monitor         = false;
-	protected Level            _optLogLevel     = null;
-	
-	
-	//specifics used for optimization
-	protected long             _numIterations   = -1; 
-	protected String[]         _iterablePredicateVarsOriginal = null;
-	
-	//specifics used for data partitioning
-	protected LocalVariableMap _variablesDPOriginal = null;
-	protected LocalVariableMap _variablesDPReuse    = null;
-	protected String           _colocatedDPMatrix   = null;
-	protected boolean          _tSparseCol          = false;
-	protected int              _replicationDP       = WRITE_REPLICATION_FACTOR;
-	protected int              _replicationExport   = -1;
-	//specifics used for result partitioning
-	protected boolean          _jvmReuse            = true;
-	//specifics used for recompilation 
-	protected double           _oldMemoryBudget = -1;
-	protected double           _recompileMemoryBudget = -1;
-	//specifics for caching
-	protected boolean          _enableCPCaching     = true;
-	protected boolean          _enableRuntimePiggybacking = false;
-	//specifics for spark 
-	protected Collection<String> _variablesRP = null;
-	protected Collection<String> _variablesECache = null;
-	
-	// program block meta data
-	protected long                _ID           = -1;
-	protected int                 _IDPrefix     = -1;
-	protected ArrayList<String>  _resultVars      = null;
-	protected IDSequence         _resultVarsIDSeq = null;
-	protected IDSequence         _dpVarsIDSeq     = null;
-	protected boolean            _monitorReport   = false;
-	protected boolean            _hasFunctions    = true;
-	
-	// local parworker data
-	protected long[] 		   	                    _pwIDs   = null;
-	protected HashMap<Long,ArrayList<ProgramBlock>> _pbcache = null;
-	
-	
-	static
-	{
-		//init static ID sequence generators
-		_pfIDSeq = new IDSequence();
-		_pwIDSeq = new IDSequence();
-	}
-	
-	public ParForProgramBlock(Program prog, String[] iterPredVars, HashMap<String,String> params) 
-		throws DMLRuntimeException 
-	{
-		this( -1, prog, iterPredVars, params);
-	}
-	
-	/**
-	 * ParForProgramBlock constructor. It reads the specified parameter settings, where defaults for non-specified parameters
-	 * have been set in ParForStatementBlock.validate(). Furthermore, it generates the IDs for the ParWorkers.
-	 * 
-	 * @param prog
-	 * @param iterPred
-	 * @throws DMLRuntimeException 
-	 */
-	public ParForProgramBlock(int ID, Program prog, String[] iterPredVars, HashMap<String,String> params) 
-		throws DMLRuntimeException  
-	{
-		super(prog, iterPredVars);
-
-		//init internal flags according to DML config
-		initInternalConfigurations(ConfigurationManager.getConfig());
-		
-		//ID generation and setting 
-		setParForProgramBlockIDs( ID );
-		_resultVarsIDSeq = new IDSequence();
-		_dpVarsIDSeq = new IDSequence();
-		
-		//parse and use internal parameters (already set to default if not specified)
-		_params = params;
-		try
-		{
-			_numThreads      = Integer.parseInt( _params.get(ParForStatementBlock.PAR) );
-			_taskPartitioner = PTaskPartitioner.valueOf( _params.get(ParForStatementBlock.TASK_PARTITIONER).toUpperCase() );
-			_taskSize        = Integer.parseInt( _params.get(ParForStatementBlock.TASK_SIZE) );
-			_dataPartitioner = PDataPartitioner.valueOf( _params.get(ParForStatementBlock.DATA_PARTITIONER).toUpperCase() );
-			_resultMerge     = PResultMerge.valueOf( _params.get(ParForStatementBlock.RESULT_MERGE).toUpperCase() );
-			_execMode        = PExecMode.valueOf( _params.get(ParForStatementBlock.EXEC_MODE).toUpperCase() );
-			_optMode         = POptMode.valueOf( _params.get(ParForStatementBlock.OPT_MODE).toUpperCase());		
-			_optLogLevel     = Level.toLevel( _params.get(ParForStatementBlock.OPT_LOG));
-			_monitor         = (Integer.parseInt(_params.get(ParForStatementBlock.PROFILE) ) == 1);
-		}
-		catch(Exception ex)
-		{
-			//runtime exception in order to keep signature of program block
-			throw new RuntimeException("Error parsing specified ParFOR parameters.",ex);
-		}
-			
-		//reset the internal opt mode if optimization globally disabled.
-		if( !OPTIMIZE )
-			_optMode = POptMode.NONE;
-			
-		_variablesDPOriginal = new LocalVariableMap();
-		_variablesDPReuse = new LocalVariableMap();
-		
-		//create IDs for all parworkers
-		if( _execMode == PExecMode.LOCAL /*&& _optMode==POptMode.NONE*/ )
-			setLocalParWorkerIDs();
-	
-		//initialize program block cache if necessary
-		if( USE_PB_CACHE ) 
-			_pbcache = new HashMap<Long, ArrayList<ProgramBlock>>();
-		
-		//created profiling report after parfor exec
-		_monitorReport = _monitor;
-		
-		//materialized meta data (reused for all invocations)
-		_hasFunctions = ProgramRecompiler.containsAtLeastOneFunction(this);
-		
-		LOG.trace("PARFOR: ParForProgramBlock created with mode = "+_execMode+", optmode = "+_optMode+", numThreads = "+_numThreads);
-	}
-	
-	public long getID()
-	{
-		return _ID;
-	}
-	
-	public PExecMode getExecMode()
-	{
-		return _execMode;
-	}
-	
-	public HashMap<String,String> getParForParams()
-	{
-		return _params;
-	}
-
-	public ArrayList<String> getResultVariables()
-	{
-		return _resultVars;
-	}
-	
-	public void setResultVariables(ArrayList<String> resultVars)
-	{
-		_resultVars = resultVars;
-	}
-	
-	public void disableOptimization()
-	{
-		_optMode = POptMode.NONE;
-	}
-	
-	public POptMode getOptimizationMode()
-	{
-		return _optMode;
-	}
-	
-	public int getDegreeOfParallelism()
-	{
-		return _numThreads;
-	}
-	
-	public void setDegreeOfParallelism(int k)
-	{
-		_numThreads = k;
-		_params.put(ParForStatementBlock.PAR, String.valueOf(_numThreads)); //kept up-to-date for copies
-		setLocalParWorkerIDs();
-	}
-
-	public void setCPCaching(boolean flag)
-	{
-		_enableCPCaching = flag;
-	}
-	
-	public void setRuntimePiggybacking(boolean flag)
-	{
-		_enableRuntimePiggybacking = flag;
-	}
-	
-	public void setExecMode( PExecMode mode )
-	{
-		_execMode = mode;
-		_params.put(ParForStatementBlock.EXEC_MODE, String.valueOf(_execMode)); //kept up-to-date for copies
-	}
-	
-	public void setTaskPartitioner( PTaskPartitioner partitioner )
-	{
-		_taskPartitioner = partitioner;
-		_params.put(ParForStatementBlock.TASK_PARTITIONER, String.valueOf(_taskPartitioner)); //kept up-to-date for copies
-	}
-	
-	public void setTaskSize( long tasksize )
-	{
-		_taskSize = tasksize;
-		_params.put(ParForStatementBlock.TASK_SIZE, String.valueOf(_taskSize)); //kept up-to-date for copies
-	}
-	
-	public void setDataPartitioner(PDataPartitioner partitioner) 
-	{
-		_dataPartitioner = partitioner;
-		_params.put(ParForStatementBlock.DATA_PARTITIONER, String.valueOf(_dataPartitioner)); //kept up-to-date for copies
-	}
-	
-	public void enableColocatedPartitionedMatrix( String varname )
-	{
-		//only called from optimizer
-		_colocatedDPMatrix = varname;
-	}
-	
-	public void setTransposeSparseColumnVector( boolean flag )
-	{
-		_tSparseCol = flag;
-	}
-	
-	public void setPartitionReplicationFactor( int rep )
-	{
-		//only called from optimizer
-		_replicationDP = rep;
-	}
-	
-	public void setExportReplicationFactor( int rep )
-	{
-		//only called from optimizer
-		_replicationExport = rep;
-	}
-	
-	public void disableJVMReuse() 
-	{
-		//only called from optimizer
-		_jvmReuse = false;
-	}
-	
-	public void disableMonitorReport()
-	{
-		_monitorReport = false;
-	}
-	
-	public void setResultMerge(PResultMerge merge) 
-	{
-		_resultMerge = merge;
-		_params.put(ParForStatementBlock.RESULT_MERGE, String.valueOf(_resultMerge)); //kept up-to-date for copies
-	}
-	
-	public void setRecompileMemoryBudget( double localMem )
-	{
-		_recompileMemoryBudget = localMem;
-	}
-	
-	public void setSparkRepartitionVariables(Collection<String> vars) {
-		_variablesRP = vars;
-	}
-	
-	public Collection<String> getSparkRepartitionVariables() {
-		return _variablesRP;
-	}
-	
-	public void setSparkEagerCacheVariables(Collection<String> vars) {
-		_variablesECache = vars;
-	}
-	
-	public long getNumIterations()
-	{
-		return _numIterations;
-	}
-	
-	public boolean hasFunctions() {
-		return _hasFunctions;
-	}
-
-	public static void initInternalConfigurations( DMLConfig conf )
-	{
-		ALLOW_REUSE_MR_JVMS = conf.getBooleanValue(DMLConfig.JVM_REUSE);
-		ALLOW_REUSE_MR_PAR_WORKER = ALLOW_REUSE_MR_JVMS;
-	}
-	
-	@Override	
-	public void execute(ExecutionContext ec)
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{	
-		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
-		
-		// add the iterable predicate variable to the variable set
-		String iterVarName = _iterablePredicateVars[0];
-
-		// evaluate from, to, incr only once (assumption: known at for entry)
-		IntObject from = executePredicateInstructions( 1, _fromInstructions, ec );
-		IntObject to   = executePredicateInstructions( 2, _toInstructions, ec );
-		IntObject incr = executePredicateInstructions( 3, _incrementInstructions, ec );
-		
-		if ( incr.getLongValue() <= 0 ) //would produce infinite loop
-			throw new DMLRuntimeException(this.printBlockErrorLocation() + "Expression for increment of variable '" + iterVarName + "' must evaluate to a positive value.");
-		
-		//early exit on num iterations = zero
-		if( computeNumIterations(from, to, incr) <= 0 )
-			return; //avoid unnecessary optimization/initialization
-		
-		///////
-		//OPTIMIZATION of ParFOR body (incl all child parfor PBs)
-		///////
-		if( _optMode != POptMode.NONE )
-		{
-			updateIterablePredicateVars( iterVarName, from, to, incr );
-			OptimizationWrapper.setLogLevel(_optLogLevel); //set optimizer log level
-			OptimizationWrapper.optimize( _optMode, sb, this, ec, _monitor ); //core optimize
-			
-			//take changed iterable predicate into account
-			iterVarName = _iterablePredicateVars[0];
-			from = executePredicateInstructions( 1, _fromInstructions, ec );
-			to   = executePredicateInstructions( 2, _toInstructions, ec );
-			incr = executePredicateInstructions( 3, _incrementInstructions, ec );
-		}
-		
-		///////
-		//DATA PARTITIONING of read-only parent variables of type (matrix,unpartitioned)
-		///////
-		Timing time = _monitor ? new Timing(true) : null;
-		
-		//partitioning on demand (note: for fused data partitioning and execute the optimizer set 
-		//the data partitioner to NONE in order to prevent any side effects)
-		handleDataPartitioning( ec ); 
-	
-		//repartitioning of variables for spark cpmm/zipmm in order prevent unnecessary shuffle
-		handleSparkRepartitioning( ec );
-		
-		//eager rdd caching of variables for spark in order prevent read/write contention
-		handleSparkEagerCaching( ec );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_DATA_T, time.stop());
-			
-		// initialize iter var to form value
-		IntObject iterVar = new IntObject(iterVarName, from.getLongValue() );
-		
-		///////
-		//begin PARALLEL EXECUTION of (PAR)FOR body
-		///////
-		LOG.trace("EXECUTE PARFOR ID = "+_ID+" with mode = "+_execMode+", numThreads = "+_numThreads+", taskpartitioner = "+_taskPartitioner);
-		
-		if( _monitor )
-		{
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTHREADS,      _numThreads);
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKSIZE,        _taskSize);
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_TASKPARTITIONER, _taskPartitioner.ordinal());
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_DATAPARTITIONER, _dataPartitioner.ordinal());
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_EXECMODE,        _execMode.ordinal());
-		}
-		
-		//preserve shared input/result variables of cleanup
-		ArrayList<String> varList = ec.getVarList();
-		HashMap<String, Boolean> varState = ec.pinVariables(varList);
-		
-		try 
-		{		
-			switch( _execMode )
-			{
-				case LOCAL: //create parworkers as local threads
-					executeLocalParFor(ec, iterVar, from, to, incr);
-					break;
-					
-				case REMOTE_MR: // create parworkers as MR tasks (one job per parfor)
-					executeRemoteMRParFor(ec, iterVar, from, to, incr);
-					break;
-				
-				case REMOTE_MR_DP: // create parworkers as MR tasks (one job per parfor)
-					executeRemoteMRParForDP(ec, iterVar, from, to, incr);
-					break;
-				
-				case REMOTE_SPARK: // create parworkers as Spark tasks (one job per parfor)
-					executeRemoteSparkParFor(ec, iterVar, from, to, incr);
-					break;
-				
-				case REMOTE_SPARK_DP: // create parworkers as Spark tasks (one job per parfor)
-					executeRemoteSparkParForDP(ec, iterVar, from, to, incr);
-					break;
-				
-				default:
-					throw new DMLRuntimeException("Undefined execution mode: '"+_execMode+"'.");
-			}	
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("PARFOR: Failed to execute loop in parallel.",ex);
-		}
-		
-		//reset state of shared input/result variables 
-		ec.unpinVariables(varList, varState);
-		
-		//cleanup unpinned shared variables
-		cleanupSharedVariables(ec, varState);
-		
-		//set iteration var to TO value (+ increment) for FOR equivalence
-		iterVar = new IntObject( iterVarName, to.getLongValue() ); //consistent with for
-		ec.setVariable(iterVarName, iterVar);
-		
-		//ensure that subsequent program blocks never see partitioned data (invalid plans!)
-		//we can replace those variables, because partitioning only applied for read-only matrices
-		for( String var : _variablesDPOriginal.keySet() )
-		{
-			//cleanup partitioned matrix (if not reused)
-			if( !_variablesDPReuse.keySet().contains(var) )
-				VariableCPInstruction.processRemoveVariableInstruction(ec, var); 
-			//reset to original matrix
-			MatrixObject mo = (MatrixObject) _variablesDPOriginal.get( var );
-			ec.setVariable(var, mo); 
-		}
-		
-		///////
-		//end PARALLEL EXECUTION of (PAR)FOR body
-		///////
-	
-		//print profiling report (only if top-level parfor because otherwise in parallel context)
-		if( _monitorReport )
-		    LOG.info("\n"+StatisticMonitor.createReport());
-		
-		//reset flags/modifications made by optimizer
-		//TODO reset of hop parallelism constraint (e.g., ba+*)
-		for( String dpvar : _variablesDPOriginal.keySet() ) //release forced exectypes
-		    ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, dpvar, ec, false);
-		 //release forced exectypes for fused dp/exec
-		if( _execMode == PExecMode.REMOTE_MR_DP || _execMode == PExecMode.REMOTE_SPARK_DP )
-			ProgramRecompiler.rFindAndRecompileIndexingHOP(sb, this, _colocatedDPMatrix, ec, false); 
-		resetIterablePredicateVars();
-		resetOptimizerFlags(); //after release, deletes dp_varnames
-		
-		//execute exit instructions (usually empty)
-		executeInstructions(_exitInstructions, ec);			
-	}
-
-
-	/**
-	 * Executes the parfor locally, i.e., the parfor is realized with numThreads local threads that drive execution.
-	 * This execution mode allows for arbitrary nested local parallelism and nested invocations of MR jobs. See
-	 * below for details of the realization.
-	 * 
-	 * @param ec
-	 * @param itervar
-	 * @param from
-	 * @param to
-	 * @param incr
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 * @throws InterruptedException 
-	 */
-	private void executeLocalParFor( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
-		throws DMLUnsupportedOperationException, DMLRuntimeException, InterruptedException
-	{
-		/* Step 1) init parallel workers, task queue and threads
-		 *         start threads (from now on waiting for tasks)
-		 * Step 2) create tasks
-		 *         put tasks into queue
-		 *         mark end of task input stream
-		 * Step 3) join all threads (wait for finished work)
-		 * Step 4) collect results from each parallel worker
-		 */
-
-		Timing time = new Timing(true);
-
-		int numExecutedTasks = 0;
-		int numExecutedIterations = 0;
-		
-		//restrict recompilation to thread local memory
-		setMemoryBudget();
-		
-		//enable runtime piggybacking if required
-		if( _enableRuntimePiggybacking )
-			RuntimePiggybacking.start( _numThreads ); //default piggybacking worker
-		
-		try
-		{
-			// Step 1) init parallel workers, task queue and threads
-			LocalTaskQueue<Task> queue = new LocalTaskQueue<Task>();
-			Thread[] threads         = new Thread[_numThreads];
-			LocalParWorker[] workers = new LocalParWorker[_numThreads];
-			for( int i=0; i<_numThreads; i++ )
-			{
-				//create parallel workers as (lazy) deep copies
-				workers[i] = createParallelWorker( _pwIDs[i], queue, ec ); 
-				threads[i] = new Thread( workers[i] );
-				threads[i].setPriority(Thread.MAX_PRIORITY); 
-			}
-			
-			// start threads (from now on waiting for tasks)
-			for( Thread thread : threads )
-				thread.start();
-			
-			//maintain statistics
-			long tinit = (long) time.stop();
-			if( DMLScript.STATISTICS )
-				Statistics.incrementParForInitTime(tinit);
-			if( _monitor ) 
-				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, tinit);
-			
-			// Step 2) create tasks 
-			TaskPartitioner partitioner = createTaskPartitioner(from, to, incr);
-			long numIterations = partitioner.getNumIterations();
-			long numCreatedTasks = -1;
-			if( USE_STREAMING_TASK_CREATION )
-			{
-				//put tasks into queue (parworker start work on first tasks while creating tasks) 
-				numCreatedTasks = partitioner.createTasks(queue);		
-			}
-			else
-			{
-				List<Task> tasks = partitioner.createTasks();
-				numCreatedTasks = tasks.size();
-				
-				// put tasks into queue
-				for( Task t : tasks )
-					queue.enqueueTask( t );
-				
-				// mark end of task input stream
-				queue.closeInput();		
-			}
-			if( _monitor )
-				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
-			
-			// Step 3) join all threads (wait for finished work)
-			for( Thread thread : threads )
-				thread.join();
-			
-			if( _monitor ) 
-				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-				
-				
-			// Step 4) collecting results from each parallel worker
-			//obtain results
-			LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; 
-			for( int i=0; i<_numThreads; i++ )
-			{
-				localVariables[i] = workers[i].getVariables();
-				numExecutedTasks += workers[i].getExecutedTasks();
-				numExecutedIterations += workers[i].getExecutedIterations();			
-			}
-			//consolidate results into global symbol table
-			consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-					                    localVariables );
-			
-			// Step 5) cleanup local parworkers (e.g., remove created functions)
-			for( int i=0; i<_numThreads; i++ )
-			{
-				Collection<String> fnNames = workers[i].getFunctionNames();
-				if( fnNames!=null && !fnNames.isEmpty() )
-					for( String fn : fnNames ) {
-						String[] parts = DMLProgram.splitFunctionKey(fn);
-						_prog.removeFunctionProgramBlock(parts[0], parts[1]);
-					}
-			}
-		}
-		finally 
-		{
-			//remove thread-local memory budget (reset to original budget)
-			//(in finally to prevent error side effects for multiple scripts in one jvm)
-			resetMemoryBudget();
-		
-			//disable runtime piggybacking
-			if( _enableRuntimePiggybacking )
-				RuntimePiggybacking.stop();
-			
-			if( _monitor )  {
-				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
-				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
-				StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-			}
-		}
-	}	
-	
-	/**
-	 * 
-	 * @param ec
-	 * @param itervar
-	 * @param from
-	 * @param to
-	 * @param incr
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 * @throws IOException 
-	 */
-	private void executeRemoteMRParFor( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
-		throws DMLUnsupportedOperationException, DMLRuntimeException, IOException
-	{
-		/* Step 0) check and recompile MR inst
-		 * Step 1) serialize child PB and inst
-		 * Step 2) create tasks
-		 *         serialize tasks
-		 * Step 3) submit MR Jobs and wait for results                        
-		 * Step 4) collect results from each parallel worker
-		 */
-		
-		Timing time = ( _monitor ? new Timing(true) : null );
-		
-		// Step 0) check and compile to CP (if forced remote parfor)
-		boolean flagForced = false;
-		if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_MR)) )
-		{
-			//tid = 0  because replaced in remote parworker
-			flagForced = checkMRAndRecompileToCP(0);
-		}
-			
-		// Step 1) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
-		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
-		String program = ProgramConverter.serializeParForBody( body );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, time.stop());
-		
-		// Step 2) create tasks 
-		TaskPartitioner partitioner = createTaskPartitioner(from, to, incr);
-		String taskFile = constructTaskFileName();
-		String resultFile = constructResultFileName();
-		
-		long numIterations = partitioner.getNumIterations();
-		int maxDigits = (int)Math.log10(to.getLongValue()) + 1;
-		long numCreatedTasks = -1;
-		if( USE_STREAMING_TASK_CREATION )
-		{
-			LocalTaskQueue<Task> queue = new LocalTaskQueue<Task>();
-
-			//put tasks into queue and start writing to taskFile
-			numCreatedTasks = partitioner.createTasks(queue);
-			taskFile        = writeTasksToFile( taskFile, queue, maxDigits );				
-		}
-		else
-		{
-			//sequentially create tasks and write to disk
-			List<Task> tasks = partitioner.createTasks();
-			numCreatedTasks  = tasks.size();
-		    taskFile         = writeTasksToFile( taskFile, tasks, maxDigits );				
-		}
-				
-		if( _monitor )
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
-		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
-				
-		// Step 3) submit MR job (wait for finished work)
-		MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : null;
-		RemoteParForJobReturn ret = RemoteParForMR.runJob(_ID, program, taskFile, resultFile, colocatedDPMatrixObj, _enableCPCaching,
-				                                          _numThreads, WRITE_REPLICATION_FACTOR, MAX_RETRYS_ON_ERROR, getMinMemory(ec),
-				                                          (ALLOW_REUSE_MR_JVMS & _jvmReuse) );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
-			
-		// Step 4) collecting results from each parallel worker
-		int numExecutedTasks = ret.getNumExecutedTasks();
-		int numExecutedIterations = ret.getNumExecutedIterations();
-		
-		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks, 
-				                    ret.getVariables() );
-		if( flagForced ) //see step 0
-			releaseForcedRecompile(0);
-		
-		if( _monitor ) 
-		{
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
-	}	
-	
-	/**
-	 * 
-	 * @param ec
-	 * @param itervar
-	 * @param from
-	 * @param to
-	 * @param incr
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	private void executeRemoteMRParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
-		throws DMLUnsupportedOperationException, DMLRuntimeException, IOException
-	{
-		/* Step 0) check and recompile MR inst
-		 * Step 1) serialize child PB and inst
-		 * Step 2) create tasks
-		 *         serialize tasks
-		 * Step 3) submit MR Jobs and wait for results                        
-		 * Step 4) collect results from each parallel worker
-		 */
-		
-		Timing time = ( _monitor ? new Timing(true) : null );
-		
-		// Step 0) check and compile to CP (if forced remote parfor)
-		boolean flagForced = checkMRAndRecompileToCP(0);
-		
-		// Step 1) prepare partitioned input matrix (needs to happen before serializing the progam)
-		ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
-		MatrixObject inputMatrix = (MatrixObject)ec.getVariable(_colocatedDPMatrix );
-		PDataPartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix );
-		inputMatrix.setPartitioned(inputDPF, 1); //mark matrix var as partitioned (for reducers) 
-		
-		// Step 2) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
-		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
-		String program = ProgramConverter.serializeParForBody( body );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, time.stop());
-		
-		// Step 3) create tasks 
-		TaskPartitioner partitioner = createTaskPartitioner(from, to, incr);
-		String resultFile = constructResultFileName();
-		long numIterations = partitioner.getNumIterations();
-		long numCreatedTasks = numIterations;//partitioner.createTasks().size();
-						
-		if( _monitor )
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
-		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
-				
-		// Step 4) submit MR job (wait for finished work)
-		OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
-				              (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))? 
-				             OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
-		RemoteParForJobReturn ret = RemoteDPParForMR.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, 
-				inputMatrix, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads, _replicationDP, MAX_RETRYS_ON_ERROR );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
-		// Step 5) collecting results from each parallel worker
-		int numExecutedTasks = ret.getNumExecutedTasks();
-		int numExecutedIterations = ret.getNumExecutedIterations();
-		
-		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-				                    ret.getVariables() );
-		
-		if( flagForced ) //see step 0
-			releaseForcedRecompile(0);
-		inputMatrix.unsetPartitioned();
-		
-		if( _monitor ) 
-		{
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
-	}
-	
-	/**
-	 * 
-	 * @param ec
-	 * @param itervar
-	 * @param from
-	 * @param to
-	 * @param incr
-	 * @throws DMLRuntimeException 
-	 * @throws DMLUnsupportedOperationException 
-	 */
-	private void executeRemoteSparkParFor(ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		Timing time = ( _monitor ? new Timing(true) : null );
-		
-		// Step 0) check and compile to CP (if forced remote parfor)
-		boolean flagForced = false;
-		if( FORCE_CP_ON_REMOTE_MR && (_optMode == POptMode.NONE || (_optMode == POptMode.CONSTRAINED && _execMode==PExecMode.REMOTE_SPARK)) )
-		{
-			//tid = 0  because replaced in remote parworker
-			flagForced = checkMRAndRecompileToCP(0); 
-		}
-			
-		// Step 1) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
-		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
-		String program = ProgramConverter.serializeParForBody( body );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, time.stop());
-		
-		// Step 2) create tasks 
-		TaskPartitioner partitioner = createTaskPartitioner(from, to, incr);
-		long numIterations = partitioner.getNumIterations();
-		
-		//sequentially create tasks as input to parfor job
-		List<Task> tasks = partitioner.createTasks();
-		long numCreatedTasks = tasks.size();
-				
-		if( _monitor )
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
-		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
-				
-		// Step 3) submit Spark parfor job (no lazy evaluation, since collect on result)
-		//MatrixObject colocatedDPMatrixObj = (_colocatedDPMatrix!=null)? (MatrixObject)ec.getVariable(_colocatedDPMatrix) : null;
-		RemoteParForJobReturn ret = RemoteParForSpark.runJob(_ID, program, tasks, ec, _enableCPCaching, _numThreads);
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
-			
-		// Step 4) collecting results from each parallel worker
-		int numExecutedTasks = ret.getNumExecutedTasks();
-		int numExecutedIterations = ret.getNumExecutedIterations();
-		
-		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations , numExecutedTasks, 
-				                    ret.getVariables() );
-		if( flagForced ) //see step 0
-			releaseForcedRecompile(0);
-		
-		if( _monitor ) 
-		{
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
-	}
-	
-	private void executeRemoteSparkParForDP( ExecutionContext ec, IntObject itervar, IntObject from, IntObject to, IntObject incr ) 
-		throws DMLUnsupportedOperationException, DMLRuntimeException, IOException
-	{
-		Timing time = ( _monitor ? new Timing(true) : null );
-		
-		// Step 0) check and compile to CP (if forced remote parfor)
-		boolean flagForced = checkMRAndRecompileToCP(0);
-		
-		// Step 1) prepare partitioned input matrix (needs to happen before serializing the progam)
-		ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
-		MatrixObject inputMatrix = (MatrixObject)ec.getVariable(_colocatedDPMatrix );
-		PDataPartitionFormat inputDPF = sb.determineDataPartitionFormat( _colocatedDPMatrix );
-		inputMatrix.setPartitioned(inputDPF, 1); //mark matrix var as partitioned (for reducers) 
-		
-		// Step 2) init parallel workers (serialize PBs)
-		// NOTES: each mapper changes filenames with regard to his ID as we submit a single job,
-		//        cannot reuse serialized string, since variables are serialized as well.
-		ParForBody body = new ParForBody( _childBlocks, _resultVars, ec );
-		String program = ProgramConverter.serializeParForBody( body );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_PARWRK_T, time.stop());
-		
-		// Step 3) create tasks 
-		TaskPartitioner partitioner = createTaskPartitioner(from, to, incr);
-		String resultFile = constructResultFileName();
-		long numIterations = partitioner.getNumIterations();
-		long numCreatedTasks = numIterations;//partitioner.createTasks().size();
-						
-		if( _monitor )
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_INIT_TASKS_T, time.stop());
-		
-		//write matrices to HDFS 
-		exportMatricesToHDFS(ec);
-				
-		// Step 4) submit MR job (wait for finished work)
-		OutputInfo inputOI = ((inputMatrix.getSparsity()<0.1 && inputDPF==PDataPartitionFormat.COLUMN_WISE)||
-				              (inputMatrix.getSparsity()<0.001 && inputDPF==PDataPartitionFormat.ROW_WISE))? 
-				             OutputInfo.BinaryCellOutputInfo : OutputInfo.BinaryBlockOutputInfo;
-		RemoteParForJobReturn ret = RemoteDPParForSpark.runJob(_ID, itervar.getName(), _colocatedDPMatrix, program, resultFile, 
-				inputMatrix, ec, inputDPF, inputOI, _tSparseCol, _enableCPCaching, _numThreads );
-		
-		if( _monitor ) 
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_EXEC_T, time.stop());
-			
-		// Step 5) collecting results from each parallel worker
-		int numExecutedTasks = ret.getNumExecutedTasks();
-		int numExecutedIterations = ret.getNumExecutedIterations();
-		
-		//consolidate results into global symbol table
-		consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-				                    ret.getVariables() );
-		
-		if( flagForced ) //see step 0
-			releaseForcedRecompile(0);
-		inputMatrix.unsetPartitioned();
-		
-		if( _monitor ) 
-		{
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_WAIT_RESULTS_T, time.stop());
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMTASKS, numExecutedTasks);
-			StatisticMonitor.putPFStat(_ID, Stat.PARFOR_NUMITERS, numExecutedIterations);
-		}			
-	}
-	
-	/**
-	 * 
-	 * @param ec
-	 * @throws DMLRuntimeException 
-	 * @throws DMLUnsupportedOperationException 
-	 */
-	private void handleDataPartitioning( ExecutionContext ec ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		if( _dataPartitioner != PDataPartitioner.NONE )
-		{			
-			ParForStatementBlock sb = (ParForStatementBlock) getStatementBlock();
-			if( sb == null )
-				throw new DMLRuntimeException("ParFor statement block required for reasoning about data partitioning.");
-			
-			ArrayList<String> vars = sb.getReadOnlyParentVars();
-			for( String var : vars )
-			{
-				Data dat = ec.getVariable(var);
-				//skip non-existing input matrices (which are due to unknown sizes marked for
-				//partitioning but typically related branches are never executed)				
-				if( dat != null && dat instanceof MatrixObject )
-				{
-					MatrixObject moVar = (MatrixObject) dat; //unpartitioned input
-					
-					PDataPartitionFormat dpf = sb.determineDataPartitionFormat( var );
-					//dpf = (_optMode != POptMode.NONE) ? OptimizerRuleBased.decideBlockWisePartitioning(moVar, dpf) : dpf;
-					LOG.trace("PARFOR ID = "+_ID+", Partitioning read-only input variable "+var+" (format="+dpf+", mode="+_dataPartitioner+")");
-					
-					if( dpf != PDataPartitionFormat.NONE )
-					{
-						Timing ltime = new Timing(true);
-						
-						//input data partitioning (reuse if possible)
-						Data dpdatNew = _variablesDPReuse.get(var);
-						if( dpdatNew == null ) //no reuse opportunity
-						{
-							DataPartitioner dp = createDataPartitioner( dpf, _dataPartitioner, ec );
-							//disable binary cell for sparse if consumed by MR jobs
-							if(    !OptimizerRuleBased.allowsBinaryCellPartitions(moVar, dpf )
-								|| OptimizerUtils.isSparkExecutionMode() ) //TODO support for binarycell
-							{
-								dp.disableBinaryCell();
-							}
-							MatrixObject moVarNew = dp.createPartitionedMatrixObject(moVar, constructDataPartitionsFileName());
-							dpdatNew = moVarNew;
-							
-							//skip remaining partitioning logic if not partitioned (e.g., too small)
-							if( moVar == moVarNew ) 
-								continue; //skip to next
-						}
-						ec.setVariable(var, dpdatNew);
-						
-						//recompile parfor body program
-						ProgramRecompiler.rFindAndRecompileIndexingHOP(sb,this,var,ec,true);
-						
-						//store original and partitioned matrix (for reuse if applicable)
-						_variablesDPOriginal.put(var, moVar);
-						if(    ALLOW_REUSE_PARTITION_VARS 
-							&& ProgramRecompiler.isApplicableForReuseVariable(sb.getDMLProg(), sb, var) ) 
-						{
-							_variablesDPReuse.put(var, dpdatNew);
-						}
-						
-						LOG.trace("Partitioning and recompilation done in "+ltime.stop()+"ms");
-					}
-				}
-			}
-		}
-	}
-	
-	/**
-	 * 
-	 * @param ec
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	private void handleSparkRepartitioning( ExecutionContext ec ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		if( OptimizerUtils.isSparkExecutionMode() &&
-			_variablesRP != null && !_variablesRP.isEmpty() ) {
-			SparkExecutionContext sec = (SparkExecutionContext) ec;
-			
-			for( String var : _variablesRP )
-				sec.repartitionAndCacheMatrixObject(var);
-		}
-	}
-	
-	/**
-	 * 
-	 * @param ec
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	private void handleSparkEagerCaching( ExecutionContext ec ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		if( OptimizerUtils.isSparkExecutionMode() &&
-			_variablesECache != null && !_variablesECache.isEmpty() ) {
-			SparkExecutionContext sec = (SparkExecutionContext) ec;
-			
-			for( String var : _variablesECache )
-				sec.cacheMatrixObject(var);
-		}
-	}
-	
-	/**
-	 * Cleanup result variables of parallel workers after result merge.
-	 * @param in 
-	 * @param out 
-	 * @throws DMLRuntimeException 
-	 */
-	private void cleanWorkerResultVariables(ExecutionContext ec, MatrixObject out, MatrixObject[] in) 
-		throws DMLRuntimeException
-	{
-		for( MatrixObject tmp : in )
-		{
-			//check for empty inputs (no iterations executed)
-			if( tmp != null && tmp != out )
-				ec.cleanupMatrixObject(tmp);
-		}
-	}
-	
-	/**
-	 * Create empty matrix objects and scalars for all unscoped vars 
-	 * (created within the parfor).
-	 * 
-	 * NOTE: parfor gives no guarantees on the values of those objects - hence
-	 * we return -1 for sclars and empty matrix objects.
-	 * 
-	 * @param out
-	 * @param sb
-	 * @throws DMLRuntimeException 
-	 */
-	private void createEmptyUnscopedVariables( LocalVariableMap out, StatementBlock sb ) 
-		throws DMLRuntimeException
-	{
-		VariableSet updated = sb.variablesUpdated();
-		VariableSet livein = sb.liveIn();
-		
-		//for all vars IN <updated> AND NOT IN <livein>
-		for( String var : updated.getVariableNames() )
-			if( !livein.containsVariable(var) )
-			{
-				//create empty output
-				DataIdentifier dat = updated.getVariable(var);
-				DataType datatype = dat.getDataType();
-				ValueType valuetype = dat.getValueType();
-				Data dataObj = null;
-				switch( datatype )
-				{
-					case SCALAR:
-						switch( valuetype )
-						{
-							case BOOLEAN: dataObj = new BooleanObject(var,false); break;
-							case INT:     dataObj = new IntObject(var,-1);        break;
-							case DOUBLE:  dataObj = new DoubleObject(var,-1d);    break;
-							case STRING:  dataObj = new StringObject(var,"-1");   break;
-							default:
-								throw new DMLRuntimeException("Value type not supported: "+valuetype);
-						}
-						break;
-					case MATRIX:
-						//currently we do not create any unscoped matrix object outputs
-						//because metadata (e.g., outputinfo) not known at this place.
-						break;
-					case UNKNOWN:
-						break;
-					default:
-						throw new DMLRuntimeException("Data type not supported: "+datatype);
-				}
-				
-				if( dataObj != null )
-					out.put(var, dataObj);
-			}
-	}
-	
-	/**
-	 * 
-	 * @throws CacheException
-	 */
-	private void exportMatricesToHDFS( ExecutionContext ec ) 
-		throws CacheException 
-	{
-		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
-		
-		if( LIVEVAR_AWARE_EXPORT && sb != null)
-		{
-			//optimization to prevent unnecessary export of matrices
-			//export only variables that are read in the body
-			VariableSet varsRead = sb.variablesRead();
-			for (String key : ec.getVariables().keySet() ) 
-			{
-				Data d = ec.getVariable(key);
-				if (    d.getDataType() == DataType.MATRIX
-					 && varsRead.containsVariable(key)  )
-				{
-					MatrixObject mo = (MatrixObject)d;
-					mo.exportData( _replicationExport );
-				}
-			}
-		}
-		else
-		{
-			//export all matrices in symbol table
-			for (String key : ec.getVariables().keySet() ) 
-			{
-				Data d = ec.getVariable(key);
-				if ( d.getDataType() == DataType.MATRIX )
-				{
-					MatrixObject mo = (MatrixObject)d;
-					mo.exportData( _replicationExport );
-				}
-			}
-		}
-	}
-	
-	/**
-	 * 
-	 * @param ec
-	 * @param varState
-	 * @throws DMLRuntimeException
-	 */
-	private void cleanupSharedVariables( ExecutionContext ec, HashMap<String,Boolean> varState ) 
-		throws DMLRuntimeException 
-	{
-		//TODO needs as precondition a systematic treatment of persistent read information.
-		/*
-		if( LIVEVAR_AWARE_CLEANUP && _sb != null)
-		{
-			//cleanup shared variables after they are unpinned
-			VariableSet liveout = _sb.liveOut();
-			for( Entry<String, Boolean> var : varState.entrySet() ) 
-			{
-				String varname = var.getKey();
-				boolean unpinned = var.getValue();
-				String fprefix = ConfigurationManager.getConfig().getTextValue("scratch") 
-						         + Lop.FILE_SEPARATOR + Lop.PROCESS_PREFIX + DMLScript.getUUID();
-				
-				//delete unpinned vars if not in liveout (similar like rmvar) and not persistent input
-				if( unpinned && !liveout.containsVariable(varname) )
-					      
-				{
-					VariableCPInstruction.processRemoveVariableInstruction(ec,varname);
-				}
-			}
-		}
-		*/
-	}
-	
-	/**
-	 * Creates a new or partially recycled instance of a parallel worker. Therefore the symbol table, and child
-	 * program blocks are deep copied. Note that entries of the symbol table are not deep copied because they are replaced 
-	 * anyway on the next write. In case of recycling the deep copies of program blocks are recycled from previous 
-	 * executions of this parfor.
-	 * 
-	 * 
-	 * @param pwID
-	 * @param queue
-	 * @param ec
-	 * @return
-	 * @throws InstantiationException
-	 * @throws IllegalAccessException
-	 * @throws DMLUnsupportedOperationException
-	 * @throws DMLRuntimeException
-	 * @throws CloneNotSupportedException
-	 */
-	private LocalParWorker createParallelWorker(long pwID, LocalTaskQueue<Task> queue, ExecutionContext ec) 
-		throws DMLRuntimeException
-	{
-		LocalParWorker pw = null; 
-		
-		try
-		{
-			//create deep copies of required elements
-			//child blocks
-			ArrayList<ProgramBlock> cpChildBlocks = null;	
-			HashSet<String> fnNames = new HashSet<String>();
-			if( USE_PB_CACHE )
-			{
-				if( _pbcache.containsKey(pwID) )
-				{
-					cpChildBlocks = _pbcache.get(pwID);	
-				}
-				else
-				{
-					cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false); 
-					_pbcache.put(pwID, cpChildBlocks);
-				}
-			}
-			else
-			{
-				cpChildBlocks = ProgramConverter.rcreateDeepCopyProgramBlocks(_childBlocks, pwID, _IDPrefix, new HashSet<String>(), fnNames, false, false); 
-			}             
-			
-			// Deep copy Execution Context
-			ExecutionContext cpEc = ProgramConverter.createDeepCopyExecutionContext(ec);
-			
-			//create the actual parallel worker
-			ParForBody body = new ParForBody( cpChildBlocks, _resultVars, cpEc );
-			pw = new LocalParWorker( pwID, queue, body, MAX_RETRYS_ON_ERROR, _monitor );
-			pw.setFunctionNames(fnNames);
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		
-		return pw;
-	}
-	
-	/**
-	 * Creates a new task partitioner according to the specified runtime parameter.
-	 * 
-	 * @param from
-	 * @param to
-	 * @param incr
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	private TaskPartitioner createTaskPartitioner( IntObject from, IntObject to, IntObject incr ) 
-		throws DMLRuntimeException
-	{
-		TaskPartitioner tp = null;
-		
-		switch( _taskPartitioner )
-		{
-			case FIXED:
-				tp = new TaskPartitionerFixedsize( _taskSize, _iterablePredicateVars[0],
-	                    					   from, to, incr );
-				break;
-			case NAIVE:
-				tp = new TaskPartitionerNaive( _taskSize, _iterablePredicateVars[0],
-                        					   from, to, incr );
-				break;
-			case STATIC:
-				tp = new TaskPartitionerStatic( _taskSize, _numThreads, _iterablePredicateVars[0],
-                        					   from, to, incr );
-				break;
-			case FACTORING:
-				tp = new TaskPartitionerFactoring( _taskSize,_numThreads, _iterablePredicateVars[0],
-							                       from, to, incr );
-				break;
-			case FACTORING_CMIN:
-				//for constrained factoring the tasksize is used as the minimum constraint
-				tp = new TaskPartitionerFactoringCmin( _taskSize,_numThreads, _taskSize, _iterablePredicateVars[0],
-							                       from, to, incr );
-				break;
-
-			case FACTORING_CMAX:
-				//for constrained factoring the tasksize is used as the minimum constraint
-				tp = new TaskPartitionerFactoringCmax( _taskSize,_numThreads, _taskSize, _iterablePredicateVars[0],
-							                       from, to, incr );
-				break;	
-			default:
-				throw new DMLRuntimeException("Undefined task partitioner: '"+_taskPartitioner+"'.");
-		}
-		
-		return tp;
-	}
-	
-	/**
-	 * Creates a new data partitioner according to the specified runtime parameter.
-	 * 
-	 * @param dpf
-	 * @param dataPartitioner
-	 * @param ec 
-	 * @return
-	 * @throws DMLRuntimeException 
-	 */
-	private DataPartitioner createDataPartitioner(PDataPartitionFormat dpf, PDataPartitioner dataPartitioner, ExecutionContext ec) 
-		throws DMLRuntimeException 
-	{
-		DataPartitioner dp = null;
-		
-		//determine max degree of parallelism
-		int numReducers = ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS);
-		int maxNumRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks();
-		//correction max number of reducers on yarn clusters
-		if( InfrastructureAnalyzer.isYarnEnabled() )
-			maxNumRed = (int)Math.max( maxNumRed, YarnClusterAnalyzer.getNumCores()/2 );				
-		int numRed = Math.min(numReducers,maxNumRed);
-		
-		//create data partitioner
-		switch( dataPartitioner )
-		{
-			case LOCAL:
-				dp = new DataPartitionerLocal(dpf, -1, _numThreads);
-				break;
-			case REMOTE_MR:
-				dp = new DataPartitionerRemoteMR( dpf, -1, _ID, numRed,
-						                          _replicationDP, 
-						                          MAX_RETRYS_ON_ERROR, 
-						                          ALLOW_REUSE_MR_JVMS, false );
-				break;
-			case REMOTE_SPARK:
-				dp = new DataPartitionerRemoteSpark( dpf, -1, ec, numRed, false );
-				break;	
-			default:
-				throw new DMLRuntimeException("Undefined data partitioner: '" +dataPartitioner.toString()+"'.");
-		}
-		
-		return dp;
-	}
-	
-	/**
-	 * 
-	 * @param prm
-	 * @param out
-	 * @param in
-	 * @param fname
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	private ResultMerge createResultMerge( PResultMerge prm, MatrixObject out, MatrixObject[] in, String fname, ExecutionContext ec ) 
-		throws DMLRuntimeException 
-	{
-		ResultMerge rm = null;
-		
-		//determine degree of parallelism
-		int numReducers = ConfigurationManager.getConfig().getIntValue(DMLConfig.NUM_REDUCERS);
-		int maxMap = InfrastructureAnalyzer.getRemoteParallelMapTasks();
-		int maxRed = InfrastructureAnalyzer.getRemoteParallelReduceTasks();
-		//correction max number of reducers on yarn clusters
-		if( InfrastructureAnalyzer.isYarnEnabled() ) {					
-			maxMap = (int)Math.max( maxMap, YarnClusterAnalyzer.getNumCores() );	
-			maxRed = (int)Math.max( maxRed, YarnClusterAnalyzer.getNumCores()/2 );	
-		}
-		int numMap = Math.max(_numThreads, maxMap);
-		int numRed = Math.min(numReducers, maxRed);
-		
-		//create result merge implementation		
-		switch( prm )
-		{
-			case LOCAL_MEM:
-				rm = new ResultMergeLocalMemory( out, in, fname );
-				break;
-			case LOCAL_FILE:
-				rm = new ResultMergeLocalFile( out, in, fname );
-				break;
-			case LOCAL_AUTOMATIC:
-				rm = new ResultMergeLocalAutomatic( out, in, fname );
-				break;
-			case REMOTE_MR:
-				rm = new ResultMergeRemoteMR( out, in, fname, _ID, numMap, numRed,
-					                          WRITE_REPLICATION_FACTOR, 
-					                          MAX_RETRYS_ON_ERROR, 
-					                          ALLOW_REUSE_MR_JVMS );
-				break;
-			case REMOTE_SPARK:
-				rm = new ResultMergeRemoteSpark( out, in, fname, ec, numMap, numRed );
-				break;
-				
-			default:
-				throw new DMLRuntimeException("Undefined result merge: '" +prm.toString()+"'.");
-		}
-		
-		return rm;
-	}
-	
-	/**
-	 * Recompile program block hierarchy to forced CP if MR instructions or functions.
-	 * Returns true if recompile was necessary and possible
-	 * 
-	 * @param tid
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	private boolean checkMRAndRecompileToCP(long tid) 
-		throws DMLRuntimeException
-	{
-		//no MR instructions, ok
-		if( !OptTreeConverter.rContainsMRJobInstruction(this, true) )
-			return false;
-		
-		//no statement block, failed
-		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
-		if( sb == null ) {
-			LOG.warn("Missing parfor statement block for recompile.");
-			return false;
-		}
-		
-		//try recompile MR instructions to CP
-		HashSet<String> fnStack = new HashSet<String>();
-		Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, tid, fnStack, ExecType.CP);
-		return true;
-	}
-	
-	/**
-	 * 
-	 * @param tid
-	 * @throws DMLRuntimeException
-	 */
-	private void releaseForcedRecompile(long tid) 
-		throws DMLRuntimeException
-	{
-		HashSet<String> fnStack = new HashSet<String>();
-		Recompiler.recompileProgramBlockHierarchy2Forced(_childBlocks, tid, fnStack, null);
-	}
-	
-	
-	/**
-	 * 
-	 * @param fname
-	 * @param tasks
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	private String writeTasksToFile(String fname, List<Task> tasks, int maxDigits)
-		throws DMLRuntimeException, IOException
-	{
-		BufferedWriter br = null;
-		try
-		{
-			Path path = new Path(fname);
-			FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
-			br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-	        
-			boolean flagFirst = true; //workaround for keeping gen order
-			for( Task t : tasks )
-			{
-				br.write( createTaskFileLine( t, maxDigits, flagFirst ) );
-				if( flagFirst )
-					flagFirst = false;
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex);
-		}
-		finally
-		{
-			if( br !=null )
-				br.close();
-		}
-		
-		return fname;
-	}
-	
-	/**
-	 * 
-	 * @param fname
-	 * @param queue
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	private String writeTasksToFile(String fname, LocalTaskQueue<Task> queue, int maxDigits)
-		throws DMLRuntimeException, IOException
-	{
-		BufferedWriter br = null;
-		try
-		{
-			Path path = new Path( fname );
-			FileSystem fs = FileSystem.get(ConfigurationManager.getCachedJobConf());
-			br = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));
-	        
-			Task t = null;
-			boolean flagFirst = true; //workaround for keeping gen order
-			while( (t = queue.dequeueTask()) != LocalTaskQueue.NO_MORE_TASKS )
-			{
-				br.write( createTaskFileLine( t, maxDigits, flagFirst ) );
-				if( flagFirst )
-					flagFirst = false;
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Error writing tasks to taskfile "+fname, ex);
-		}
-		finally
-		{
-			if( br !=null )
-				br.close();
-		}
-		
-		return fname;
-	}
-	
-	private String createTaskFileLine( Task t, int maxDigits, boolean flagFirst ) 
-	{
-		//always pad to max digits in order to preserve task order	
-		String ret = t.toCompactString(maxDigits) + (flagFirst?" ":"") + "\n";
-		return ret;
-	}
-	
-	/**
-	 * 
-	 * @param expIters
-	 * @param expTasks
-	 * @param numIters
-	 * @param numTasks
-	 * @param results
-	 * @throws DMLRuntimeException
-	 */
-	private void consolidateAndCheckResults(ExecutionContext ec, long expIters, long expTasks, long numIters, long numTasks, LocalVariableMap [] results) 
-		throws DMLRuntimeException
-	{
-		Timing time = new Timing(true);
-		
-		//result merge
-		if( checkParallelRemoteResultMerge() )
-		{
-			//execute result merge in parallel for all result vars
-			int par = Math.min( _resultVars.size(), 
-					            InfrastructureAnalyzer.getLocalParallelism() );
-			
-			try
-			{
-				//enqueue all result vars as tasks
-				LocalTaskQueue<String> q = new LocalTaskQueue<String>();
-				for( String var : _resultVars ) //foreach non-local write
-					if( ec.getVariable(var) instanceof MatrixObject ) //robustness scalars
-						q.enqueueTask(var);
-				q.closeInput();
-				
-				//run result merge workers
-				Thread[] rmWorkers = new Thread[par];
-				for( int i=0; i<par; i++ )
-					rmWorkers[i] = new Thread(new ResultMergeWorker(q, results, ec));
-				for( int i=0; i<par; i++ ) //start all
-					rmWorkers[i].start();
-				for( int i=0; i<par; i++ ) //wait for all
-					rmWorkers[i].join();
-			}
-			catch(Exception ex)
-			{
-				throw new DMLRuntimeException(ex);
-			}
-		}
-		else
-		{
-			//execute result merge sequentially for all result vars
-			for( String var : _resultVars ) //foreach non-local write
-			{			
-				Data dat = ec.getVariable(var);
-				if( dat instanceof MatrixObject ) //robustness scalars
-				{
-					MatrixObject out = (MatrixObject) dat;
-					MatrixObject[] in = new MatrixObject[ results.length ];
-					for( int i=0; i< results.length; i++ )
-						in[i] = (MatrixObject) results[i].get( var ); 			
-					String fname = constructResultMergeFileName();
-					ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, ec);
-					MatrixObject outNew = null;
-					if( USE_PARALLEL_RESULT_MERGE )
-						outNew = rm.executeParallelMerge( _numThreads );
-					else
-						outNew = rm.executeSerialMerge(); 		
-					
-					//cleanup existing var
-					Data exdata = ec.removeVariable(var);
-					if( exdata != null && exdata != outNew && exdata instanceof MatrixObject )
-						ec.cleanupMatrixObject((MatrixObject)exdata);
-							
-					//cleanup of intermediate result variables
-					cleanWorkerResultVariables( ec, out, in );
-					
-					//set merged result variable
-					ec.setVariable(var, outNew);
-				}
-			}
-		}
-		
-		//handle unscoped variables (vars created in parfor, but potentially used afterwards)
-		ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
-		if( CREATE_UNSCOPED_RESULTVARS && sb != null && ec.getVariables() != null ) //sb might be null for nested parallelism
-			createEmptyUnscopedVariables( ec.getVariables(), sb );
-		
-		//check expected counters
-		if( numTasks != expTasks || numIters !=expIters ) //consistency check
-			throw new DMLRuntimeException("PARFOR: Number of executed tasks does not match the number of created tasks: tasks "+numTasks+"/"+expTasks+", iters "+numIters+"/"+expIters+".");
-	
-		if( DMLScript.STATISTICS )
-			Statistics.incrementParForMergeTime((long) time.stop());
-	}
-	
-	/**
-	 * NOTE: Currently we use a fixed rule (multiple results AND REMOTE_MR -> only selected by the optimizer
-	 * if mode was REMOTE_MR as well). 
-	 * 
-	 * TODO The optimizer should explicitly decide about parallel result merge and its degree of parallelism.
-	 * 
-	 * @return
-	 */
-	private boolean checkParallelRemoteResultMerge()
-	{
-		return (USE_PARALLEL_RESULT_MERGE_REMOTE 
-			    && _resultVars.size() > 1
-			    && ( _resultMerge == PResultMerge.REMOTE_MR
-			       ||_resultMerge == PResultMerge.REMOTE_SPARK) );
-	}
-	
-	/**
-	 * 
-	 * @param IDPrefix
-	 */
-	private void setParForProgramBlockIDs(int IDPrefix)
-	{
-		_IDPrefix = IDPrefix;
-		if( _IDPrefix == -1 ) //not specified
-			_ID = _pfIDSeq.getNextID(); //generated new ID
-		else //remote case (further nested parfors are all in one JVM)
-		    _ID = IDHandler.concatIntIDsToLong(_IDPrefix, (int)_pfIDSeq.getNextID());	
-	}
-	
-	/**
-	 * TODO rework id handling in order to enable worker reuse
-	 * 
-	 */
-	private void setLocalParWorkerIDs()
-	{
-		if( _numThreads<=0 )
-			return;
-		
-		//set all parworker IDs required if PExecMode.LOCAL is used
-			
-		_pwIDs = new long[ _numThreads ];
-		
-		for( int i=0; i<_numThreads; i++ )
-		{
-			if(_IDPrefix == -1)
-				_pwIDs[i] = _pwIDSeq.getNextID();
-			else
-				_pwIDs[i] = IDHandler.concatIntIDsToLong(_IDPrefix,(int)_pwIDSeq.getNextID());
-			
-			if( _monitor ) 
-				StatisticMonitor.putPfPwMapping(_ID, _pwIDs[i]);
-		}
-	}
-
-	/**
-	 * 
-	 * @param from
-	 * @param to
-	 * @param incr
-	 */
-	private long computeNumIterations( IntObject from, IntObject to, IntObject incr )
-	{
-		return (long)Math.ceil(((double)(to.getLongValue() - from.getLongValue() + 1)) / incr.getLongValue()); 
-	}
-	
-	/**
-	 * 
-	 * @param iterVarName
-	 * @param from
-	 * @param to
-	 * @param incr
-	 */
-	private void updateIterablePredicateVars(String iterVarName, IntObject from, IntObject to, IntObject incr) 
-	{
-		_numIterations = computeNumIterations(from, to, incr); 
-		
-		//keep original iterable predicate
-		_iterablePredicateVarsOriginal = new String[4];
-		System.arraycopy(_iterablePredicateVars, 0, _iterablePredicateVarsOriginal, 0, 4);
-		
-		_iterablePredicateVars[0] = iterVarName;
-		_iterablePredicateVars[1] = from.getStringValue();
-		_iterablePredicateVars[2] = to.getStringValue();
-		_iterablePredicateVars[3] = incr.getStringValue();
-	}
-	
-	/**
-	 * 
-	 */
-	private void resetIterablePredicateVars()
-	{
-		//reset of modified for optimization (opt!=NONE)
-		if( _iterablePredicateVarsOriginal!=null ) 
-			System.arraycopy(_iterablePredicateVarsOriginal, 0, _iterablePredicateVars, 0, 4);
-	}
-	
-	/**
-	 * NOTE: Only required for remote parfor. Hence, there is no need to transfer DMLConfig to
-	 * the remote workers (MR job) since nested remote parfor is not supported.
- 	 * 
-	 * @return
-	 */
-	private String constructTaskFileName()
-	{
-		String scratchSpaceLoc = ConfigurationManager.getConfig()
-        							.getTextValue(DMLConfig.SCRATCH_SPACE);
-	
-		StringBuilder sb = new StringBuilder();
-		sb.append(scratchSpaceLoc);
-		sb.append(Lop.FILE_SEPARATOR);
-		sb.append(Lop.PROCESS_PREFIX);
-		sb.append(DMLScript.getUUID());
-		sb.append(PARFOR_MR_TASKS_TMP_FNAME.replaceAll("%ID%", String.valueOf(_ID)));
-		
-		return sb.toString();   
-	}
-	
-	/**
-	 * NOTE: Only required for remote parfor. Hence, there is no need to transfer DMLConfig to
-	 * the remote workers (MR job) since nested remote parfor is not supported.
-	 * 
-	 * @return
-	 */
-	private String constructResultFileName()
-	{
-		String scratchSpaceLoc = ConfigurationManager.getConfig()
-									.getTextValue(DMLConfig.SCRATCH_SPACE);
-		
-		StringBuilder sb = new StringBuilder();
-		sb.append(scratchSpaceLoc);
-		sb.append(Lop.FILE_SEPARATOR);
-		sb.append(Lop.PROCESS_PREFIX);
-		sb.append(DMLScript.getUUID());
-		sb.append(PARFOR_MR_RESULT_TMP_FNAME.replaceAll("%ID%", String.valueOf(_ID)));
-		
-		return sb.toString();   
-	}
-
-	/**
-	 * 
-	 * @return
-	 */
-	private String constructResultMergeFileName()
-	{
-		String scratchSpaceLoc = ConfigurationManager.getConfig()
-		                             .getTextValue(DMLConfig.SCRATCH_SPACE);
-		
-		String fname = PARFOR_MR_RESULTMERGE_FNAME;
-		fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID
-		fname = fname.replaceAll("%VAR%", String.valueOf(_resultVarsIDSeq.getNextID()));
-		
-		StringBuilder sb = new StringBuilder();
-		sb.append(scratchSpaceLoc);
-		sb.append(Lop.FILE_SEPARATOR);
-		sb.append(Lop.PROCESS_PREFIX);
-		sb.append(DMLScript.getUUID());
-		sb.append(fname);
-		
-		return sb.toString();   		
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	private String constructDataPartitionsFileName()
-	{
-		String scratchSpaceLoc = ConfigurationManager.getConfig()
-		                             .getTextValue(DMLConfig.SCRATCH_SPACE);
-		
-		String fname = PARFOR_DATAPARTITIONS_FNAME;
-		fname = fname.replaceAll("%ID%", String.valueOf(_ID)); //replace workerID
-		fname = fname.replaceAll("%VAR%", String.valueOf(_dpVarsIDSeq.getNextID()));
-		
-		StringBuilder sb = new StringBuilder();
-		sb.append(scratchSpaceLoc);
-		sb.append(Lop.FILE_SEPARATOR);
-		sb.append(Lop.PROCESS_PREFIX);
-		sb.append(DMLScript.getUUID());
-		sb.append(fname);
-		
-		return sb.toString();   		
-	}
-
-	/**
-	 * 
-	 * @return
-	 */
-	private long getMinMemory(ExecutionContext ec)
-	{
-		long ret = -1;
-		
-		//if forced remote exec and single node
-		if(    DMLScript.rtplatform == RUNTIME_PLATFORM.SINGLE_NODE 
-			&& _execMode == PExecMode.REMOTE_MR
-			&& _optMode == POptMode.NONE      )
-		{
-			try 
-			{
-				ParForStatementBlock sb = (ParForStatementBlock)getStatementBlock();
-				OptTree tree = OptTreeConverter.createAbstractOptTree(-1, -1, sb, this, new HashSet<String>(), ec);
-				CostEstimator est = new CostEstimatorHops( OptTreeConverter.getAbstractPlanMapping() );
-				double mem = est.getEstimate(TestMeasure.MEMORY_USAGE, tree.getRoot());
-				
-				ret = (long) (mem * ( 1d/OptimizerUtils.MEM_UTIL_FACTOR  )); 
-			} 
-			catch(Exception e) 
-			{
-				LOG.error("Failed to analyze minmum memory requirements.", e);
-			} 
-		}
-		
-		return ret;
-	}
-
-	private void setMemoryBudget()
-	{
-		if( _recompileMemoryBudget > 0 )
-		{
-			// store old budget for reset after exec
-			_oldMemoryBudget = (double)InfrastructureAnalyzer.getLocalMaxMemory();
-			
-			// scale budget with applied mem util factor (inverted during getMemBudget() )
-			long newMaxMem = (long) (_recompileMemoryBudget / OptimizerUtils.MEM_UTIL_FACTOR);
-			InfrastructureAnalyzer.setLocalMaxMemory( newMaxMem );
-		}
-	}
-	
-	private void resetMemoryBudget()
-	{
-		if( _recompileMemoryBudget > 0 )
-		{
-			InfrastructureAnalyzer.setLocalMaxMemory((long)_oldMemoryBudget);
-		}
-	}
-	
-	private void resetOptimizerFlags()
-	{
-		//reset all state that was set but is not guaranteed to be overwritten by optimizer
-		_variablesDPOriginal.removeAll();
-		_iterablePredicateVarsOriginal = null;
-		_colocatedDPMatrix     = null;
-		_replicationDP         = WRITE_REPLICATION_FACTOR;
-		_replicationExport     = -1;
-		_jvmReuse              = true;
-		_recompileMemoryBudget = -1;
-		_enableRuntimePiggybacking = false;
-		_variablesRP           = null;
-		_variablesECache       = null;
-	}
-	
-	
-	/**
-	 * Helper class for parallel invocation of REMOTE_MR result merge for multiple variables.
-	 */
-	private class ResultMergeWorker implements Runnable
-	{
-		private LocalTaskQueue<String> _q = null;
-		private LocalVariableMap[] _refVars = null;
-		private ExecutionContext _ec = null;
-		
-		public ResultMergeWorker( LocalTaskQueue<String> q, LocalVariableMap[] results, ExecutionContext ec )
-		{
-			_q = q;
-			_refVars = results;
-			_ec = ec;
-		}
-		
-		@Override
-		public void run() 
-		{
-			try
-			{
-				while( true ) 
-				{
-					String varname = _q.dequeueTask();
-					if( varname == LocalTaskQueue.NO_MORE_TASKS ) // task queue closed (no more tasks)
-						break;
-				
-					MatrixObject out = null;
-					synchronized( _ec.getVariables() ){
-						out = (MatrixObject) _ec.getVariable(varname);
-					}
-					
-					MatrixObject[] in = new MatrixObject[ _refVars.length ];
-					for( int i=0; i< _refVars.length; i++ )
-						in[i] = (MatrixObject) _refVars[i].get( varname ); 			
-					String fname = constructResultMergeFileName();
-				
-					ResultMerge rm = createResultMerge(_resultMerge, out, in, fname, _ec);
-					MatrixObject outNew = null;
-					if( USE_PARALLEL_RESULT_MERGE )
-						outNew = rm.executeParallelMerge( _numThreads );
-					else
-						outNew = rm.executeSerialMerge(); 	
-					
-					synchronized( _ec.getVariables() ){
-						_ec.getVariables().put( varname, outNew);
-					}
-		
-					//cleanup of intermediate result variables
-					cleanWorkerResultVariables( _ec, out, in );
-				}
-			}
-			catch(Exception ex)
-			{
-				LOG.error("Error executing result merge: ", ex);
-			}
-		}
-	}
-	
-
-	public String printBlockErrorLocation(){
-		return "ERROR: Runtime error in parfor program block generated from parfor statement block between lines " + _beginLine + " and " + _endLine + " -- ";
-	}
-	
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/Program.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/Program.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/Program.java
deleted file mode 100644
index 08d7958..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/Program.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map.Entry;
-
-import com.ibm.bi.dml.parser.DMLProgram;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLScriptException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
-
-
-public class Program 
-{
-	
-	public static final String KEY_DELIM = "::";
-	
-	public ArrayList<ProgramBlock> _programBlocks;
-
-	private HashMap<String, HashMap<String,FunctionProgramBlock>> _namespaceFunctions;
-	
-	public Program() throws DMLRuntimeException {
-		_namespaceFunctions = new HashMap<String, HashMap<String,FunctionProgramBlock>>(); 
-		_programBlocks = new ArrayList<ProgramBlock>();
-	}
-
-	/**
-	 * 
-	 * @param namespace
-	 * @param fname
-	 * @param fpb
-	 */
-	public synchronized void addFunctionProgramBlock(String namespace, String fname, FunctionProgramBlock fpb)
-	{	
-		if (namespace == null) 
-			namespace = DMLProgram.DEFAULT_NAMESPACE;
-		
-		HashMap<String,FunctionProgramBlock> namespaceBlocks = null;
-		namespaceBlocks = _namespaceFunctions.get(namespace);
-		if (namespaceBlocks == null){
-			namespaceBlocks = new HashMap<String,FunctionProgramBlock>();
-			_namespaceFunctions.put(namespace,namespaceBlocks);
-		}
-		
-		namespaceBlocks.put(fname,fpb);
-	}
-	
-	/**
-	 * 
-	 * @param namespace
-	 * @param fname
-	 */
-	public synchronized void removeFunctionProgramBlock(String namespace, String fname) 
-	{	
-		if (namespace == null) 
-			namespace = DMLProgram.DEFAULT_NAMESPACE;
-		
-		HashMap<String,FunctionProgramBlock> namespaceBlocks = null;
-		if( _namespaceFunctions.containsKey(namespace) ){
-			namespaceBlocks = _namespaceFunctions.get(namespace);
-			if( namespaceBlocks.containsKey(fname) )
-				namespaceBlocks.remove(fname);
-		}
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public synchronized HashMap<String,FunctionProgramBlock> getFunctionProgramBlocks(){
-		
-		HashMap<String,FunctionProgramBlock> retVal = new HashMap<String,FunctionProgramBlock>();
-		
-		//create copy of function program blocks
-		for (String namespace : _namespaceFunctions.keySet()){
-			HashMap<String,FunctionProgramBlock> namespaceFSB = _namespaceFunctions.get(namespace);
-			for( Entry<String, FunctionProgramBlock> e: namespaceFSB.entrySet() ){
-				String fname = e.getKey(); 
-				FunctionProgramBlock fpb = e.getValue();
-				String fKey = DMLProgram.constructFunctionKey(namespace, fname);
-				retVal.put(fKey, fpb);
-			}
-		}
-		
-		return retVal;
-	}
-	
-	/**
-	 * 
-	 * @param namespace
-	 * @param fname
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public synchronized FunctionProgramBlock getFunctionProgramBlock(String namespace, String fname) throws DMLRuntimeException{
-		
-		if (namespace == null) namespace = DMLProgram.DEFAULT_NAMESPACE;
-		
-		HashMap<String,FunctionProgramBlock> namespaceFunctBlocks = _namespaceFunctions.get(namespace);
-		if (namespaceFunctBlocks == null)
-			throw new DMLRuntimeException("namespace " + namespace + " is undefined");
-		FunctionProgramBlock retVal = namespaceFunctBlocks.get(fname);
-		if (retVal == null)
-			throw new DMLRuntimeException("function " + fname + " is undefined in namespace " + namespace);
-		
-		return retVal;
-	}
-	
-	public void addProgramBlock(ProgramBlock pb) {
-		_programBlocks.add(pb);
-	}
-
-	public ArrayList<ProgramBlock> getProgramBlocks() {
-		return _programBlocks;
-	}
-
-	public void execute(ExecutionContext ec)
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		ec.initDebugProgramCounters();
-		
-		try
-		{
-			for (int i=0 ; i<_programBlocks.size() ; i++) {
-				ec.updateDebugState(i);
-				_programBlocks.get(i).execute(ec);
-			}
-		}
-		catch(DMLScriptException e) {
-			throw e;
-		}
-		catch(Exception e) {
-			throw new DMLRuntimeException(e);
-		}
-		
-		ec.clearDebugProgramCounters();
-	}
-		
-	
-	public void printMe() {
-		
-		for (ProgramBlock pb : this._programBlocks) {
-			pb.printMe();
-		}
-	}
-}