You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:38 UTC

[07/78] [abbrv] [partial] incubator-systemml git commit: Move files to new package folder structure

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
deleted file mode 100644
index 48b4699..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/context/SparkExecutionContext.java
+++ /dev/null
@@ -1,1155 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.context;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.storage.RDDInfo;
-import org.apache.spark.storage.StorageLevel;
-
-import scala.Tuple2;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.api.MLContext;
-import com.ibm.bi.dml.api.MLContextProxy;
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.lops.Checkpoint;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.Program;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.instructions.spark.CheckpointSPInstruction;
-import com.ibm.bi.dml.runtime.instructions.spark.SPInstruction;
-import com.ibm.bi.dml.runtime.instructions.spark.data.BlockPartitioner;
-import com.ibm.bi.dml.runtime.instructions.spark.data.BroadcastObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.LineageObject;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedBroadcastMatrix;
-import com.ibm.bi.dml.runtime.instructions.spark.data.PartitionedMatrixBlock;
-import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
-import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
-import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyBlockPairFunction;
-import com.ibm.bi.dml.runtime.instructions.spark.functions.CopyTextInputFunction;
-import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
-import com.ibm.bi.dml.runtime.instructions.spark.utils.SparkUtils;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.utils.Statistics;
-
-
-public class SparkExecutionContext extends ExecutionContext
-{
-
-	private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName());
-
-	//internal configurations 
-	private static boolean LAZY_SPARKCTX_CREATION = true;
-	private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
-	private static boolean FAIR_SCHEDULER_MODE = true;
-	
-	//executor memory and relative fractions as obtained from the spark configuration
-	private static long _memExecutors = -1; //mem per executors
-	private static double _memRatioData = -1; 
-	private static double _memRatioShuffle = -1;
-	private static int _numExecutors = -1; //total executors
-	private static int _defaultPar = -1; //total vcores  
-	private static boolean _confOnly = false; //infrastructure info based on config
-	
-	// Only one SparkContext may be active per JVM. You must stop() the active SparkContext before creating a new one. 
-	// This limitation may eventually be removed; see SPARK-2243 for more details.
-	private static JavaSparkContext _spctx = null; 
-	
-	protected SparkExecutionContext(Program prog) 
-	{
-		//protected constructor to force use of ExecutionContextFactory
-		this( true, prog );
-	}
-
-	protected SparkExecutionContext(boolean allocateVars, Program prog) 
-	{
-		//protected constructor to force use of ExecutionContextFactory
-		super( allocateVars, prog );
-				
-		//spark context creation via internal initializer
-		if( !(LAZY_SPARKCTX_CREATION && OptimizerUtils.isHybridExecutionMode()) ) {
-			initSparkContext();
-		}
-	}
-		
-	/**
-	 * Returns the used singleton spark context. In case of lazy spark context
-	 * creation, this methods blocks until the spark context is created.
-	 *  
-	 * @return
-	 */
-	public JavaSparkContext getSparkContext()
-	{
-		//lazy spark context creation on demand (lazy instead of asynchronous 
-		//to avoid wait for uninitialized spark context on close)
-		if( LAZY_SPARKCTX_CREATION ) {
-			initSparkContext();
-		}
-		
-		//return the created spark context
-		return _spctx;
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public static JavaSparkContext getSparkContextStatic()
-	{
-		initSparkContext();
-		return _spctx;
-	}
-	
-	/**
-	 * 
-	 */
-	public void close() 
-	{
-		synchronized( SparkExecutionContext.class ) {
-			if( _spctx != null ) 
-			{
-				//stop the spark context if existing
-				_spctx.stop();
-				
-				//make sure stopped context is never used again
-				_spctx = null; 
-			}
-				
-		}
-	}
-	
-	public static boolean isLazySparkContextCreation(){
-		return LAZY_SPARKCTX_CREATION;
-	}
-	
-	/**
-	 * 
-	 */
-	private synchronized static void initSparkContext()
-	{	
-		//check for redundant spark context init
-		if( _spctx != null )
-			return;
-	
-		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-		
-		//create a default spark context (master, appname, etc refer to system properties
-		//as given in the spark configuration or during spark-submit)
-		
-		MLContext mlCtx = MLContextProxy.getActiveMLContext();
-		if(mlCtx != null) 
-		{
-			// This is when DML is called through spark shell
-			// Will clean the passing of static variables later as this involves minimal change to DMLScript
-			_spctx = new JavaSparkContext(mlCtx.getSparkContext());
-		}
-		else 
-		{
-			if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
-				// For now set 4 cores for integration testing :)
-				SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("My local integration test app");
-				// This is discouraged in spark but have added only for those testcase that cannot stop the context properly
-				// conf.set("spark.driver.allowMultipleContexts", "true");
-				conf.set("spark.ui.enabled", "false");
-				_spctx = new JavaSparkContext(conf);
-			}
-			else //default cluster setup
-			{
-				//setup systemml-preferred spark configuration (w/o user choice)
-				SparkConf conf = new SparkConf();
-				
-				//always set unlimited result size (required for cp collect)
-				conf.set("spark.driver.maxResultSize", "0");
-				
-				//always use the fair scheduler (for single jobs, it's equivalent to fifo
-				//but for concurrent jobs in parfor it ensures better data locality because
-				//round robin assignment mitigates the problem of 'sticky slots')
-				if( FAIR_SCHEDULER_MODE ) {
-					conf.set("spark.scheduler.mode", "FAIR");
-				}
-				
-				_spctx = new JavaSparkContext(conf);
-			}
-		}
-			
-		//globally add binaryblock serialization framework for all hdfs read/write operations
-		//TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end 
-		if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-			MRJobConfiguration.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() );
-		
-		//statistics maintenance
-		if( DMLScript.STATISTICS ){
-			Statistics.setSparkCtxCreateTime(System.nanoTime()-t0);
-		}
-	}	
-	
-	/**
-	 * Spark instructions should call this for all matrix inputs except broadcast
-	 * variables.
-	 * 
-	 * @param varname
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	@SuppressWarnings("unchecked")
-	public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException 
-	{
-		return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
-	}
-	
-	/**
-	 * 
-	 * @param varname
-	 * @param inputInfo
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		MatrixObject mo = getMatrixObject(varname);
-		return getRDDHandleForMatrixObject(mo, inputInfo);
-	}
-	
-	/**
-	 * This call returns an RDD handle for a given matrix object. This includes 
-	 * the creation of RDDs for in-memory or binary-block HDFS data. 
-	 * 
-	 * 
-	 * @param varname
-	 * @return
-	 * @throws DMLRuntimeException 
-	 * @throws DMLUnsupportedOperationException 
-	 */
-	@SuppressWarnings("unchecked")
-	public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{		
-		//NOTE: MB this logic should be integrated into MatrixObject
-		//However, for now we cannot assume that spark libraries are 
-		//always available and hence only store generic references in 
-		//matrix object while all the logic is in the SparkExecContext
-		
-		JavaPairRDD<?,?> rdd = null;
-		//CASE 1: rdd already existing (reuse if checkpoint or trigger
-		//pending rdd operations if not yet cached but prevent to re-evaluate 
-		//rdd operations if already executed and cached
-		if(    mo.getRDDHandle()!=null 
-			&& (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) )
-		{
-			//return existing rdd handling (w/o input format change)
-			rdd = mo.getRDDHandle().getRDD();
-		}
-		//CASE 2: dirty in memory data or cached result of rdd operations
-		else if( mo.isDirty() || mo.isCached(false) )
-		{
-			//get in-memory matrix block and parallelize it
-			//w/ guarded parallelize (fallback to export, rdd from file if too large)
-			boolean fromFile = false;
-			if( !OptimizerUtils.checkSparkCollectMemoryBudget(mo.getMatrixCharacteristics(), 0) ) {
-				if( mo.isDirty() ) { //write only if necessary
-					mo.exportData();
-				}
-				rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
-				rdd = ((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd).mapToPair( new CopyBlockPairFunction() ); //cp is workaround for read bug			
-				fromFile = true;
-			}
-			else { //default case
-				MatrixBlock mb = mo.acquireRead(); //pin matrix in memory
-				rdd = toJavaPairRDD(getSparkContext(), mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock());
-				mo.release(); //unpin matrix
-			}
-			
-			//keep rdd handle for future operations on it
-			RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
-			rddhandle.setHDFSFile(fromFile);
-			mo.setRDDHandle(rddhandle);
-		}
-		//CASE 3: non-dirty (file exists on HDFS)
-		else
-		{
-			// parallelize hdfs-resident file
-			// For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class
-			if(inputInfo == InputInfo.BinaryBlockInputInfo) {
-				rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
-				//note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat
-				//recordreader returns; the javadoc explicitly recommend to copy all key/value pairs
-				rdd = ((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd).mapToPair( new CopyBlockPairFunction() ); //cp is workaround for read bug
-			}
-			else if(inputInfo == InputInfo.TextCellInputInfo || inputInfo == InputInfo.CSVInputInfo || inputInfo == InputInfo.MatrixMarketInputInfo) {
-				rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
-				rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug
-			}
-			else if(inputInfo == InputInfo.BinaryCellInputInfo) {
-				rdd = getSparkContext().hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
-				rdd = ((JavaPairRDD<MatrixIndexes, MatrixCell>)rdd).mapToPair( new CopyBinaryCellFunction() ); //cp is workaround for read bug
-			}
-			else {
-				throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
-			}
-			
-			//keep rdd handle for future operations on it
-			RDDObject rddhandle = new RDDObject(rdd, mo.getVarName());
-			rddhandle.setHDFSFile(true);
-			mo.setRDDHandle(rddhandle);
-		}
-		
-		return rdd;
-	}
-	
-	/**
-	 * TODO So far we only create broadcast variables but never destroy
-	 * them. This is a memory leak which might lead to executor out-of-memory.
-	 * However, in order to handle this, we need to keep track when broadcast 
-	 * variables are no longer required.
-	 * 
-	 * @param varname
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	@SuppressWarnings("unchecked")
-	public PartitionedBroadcastMatrix getBroadcastForVariable( String varname ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		MatrixObject mo = getMatrixObject(varname);
-		
-		PartitionedBroadcastMatrix bret = null;
-		
-		if(    mo.getBroadcastHandle()!=null 
-			&& mo.getBroadcastHandle().isValid() ) 
-		{
-			//reuse existing broadcast handle
-			bret = mo.getBroadcastHandle().getBroadcast();
-		}
-		else 
-		{
-			//obtain meta data for matrix 
-			int brlen = (int) mo.getNumRowsPerBlock();
-			int bclen = (int) mo.getNumColumnsPerBlock();
-			
-			//create partitioned matrix block and release memory consumed by input
-			MatrixBlock mb = mo.acquireRead();
-			PartitionedMatrixBlock pmb = new PartitionedMatrixBlock(mb, brlen, bclen);
-			mo.release();
-			
-			//determine coarse-grained partitioning
-			int numPerPart = PartitionedBroadcastMatrix.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
-			int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart); 
-			Broadcast<PartitionedMatrixBlock>[] ret = new Broadcast[numParts];
-					
-			//create coarse-grained partitioned broadcasts
-			if( numParts > 1 ) {
-				for( int i=0; i<numParts; i++ ) {
-					int offset = i * numPerPart;
-					int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
-					PartitionedMatrixBlock tmp = pmb.createPartition(offset, numBlks);
-					ret[i] = getSparkContext().broadcast(tmp);
-				}
-			}
-			else { //single partition
-				ret[0] = getSparkContext().broadcast( pmb);
-			}
-		
-			bret = new PartitionedBroadcastMatrix(ret);
-			BroadcastObject bchandle = new BroadcastObject(bret, varname);
-			mo.setBroadcastHandle(bchandle);
-		}
-		
-		return bret;
-	}
-	
-	/**
-	 * 
-	 * @param varname
-	 * @return
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	public BlockPartitioner getPartitionerForRDDVariable(String varname) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		//get input rdd and matrix characteristics
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in = getBinaryBlockRDDHandleForVariable(varname);
-		MatrixCharacteristics mc = getMatrixCharacteristics(varname);
-		
-		//create tile-based matrix partitioner
-		return new BlockPartitioner(mc, in.partitions().size());
-	}
-	
-	/**
-	 * Keep the output rdd of spark rdd operations as meta data of matrix objects in the 
-	 * symbol table.
-	 * 
-	 * Spark instructions should call this for all matrix outputs.
-	 * 
-	 * 
-	 * @param varname
-	 * @param rdd
-	 * @throws DMLRuntimeException 
-	 */
-	public void setRDDHandleForVariable(String varname, JavaPairRDD<MatrixIndexes,?> rdd) 
-		throws DMLRuntimeException
-	{
-		MatrixObject mo = getMatrixObject(varname);
-		RDDObject rddhandle = new RDDObject(rdd, varname);
-		mo.setRDDHandle( rddhandle );
-	}
-	
-	/**
-	 * Utility method for creating an RDD out of an in-memory matrix block.
-	 * 
-	 * @param sc
-	 * @param block
-	 * @return
-	 * @throws DMLUnsupportedOperationException 
-	 * @throws DMLRuntimeException 
-	 */
-	public static JavaPairRDD<MatrixIndexes,MatrixBlock> toJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<Tuple2<MatrixIndexes,MatrixBlock>>();
-		
-		if(    src.getNumRows() <= brlen 
-		    && src.getNumColumns() <= bclen )
-		{
-			list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(1,1), src));
-		}
-		else
-		{
-			boolean sparse = src.isInSparseFormat();
-			
-			//create and write subblocks of matrix
-			for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
-				for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
-				{
-					int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
-					int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-					
-					MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse);
-						
-					int row_offset = blockRow*brlen;
-					int col_offset = blockCol*bclen;
-	
-					//copy submatrix to block
-					src.sliceOperations( row_offset, row_offset+maxRow-1, 
-							             col_offset, col_offset+maxCol-1, block );							
-					
-					//append block to sequence file
-					MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1);
-					list.addLast(new Tuple2<MatrixIndexes,MatrixBlock>(indexes, block));
-				}
-		}
-		
-		return sc.parallelizePairs(list);
-	}
-	
-	/**
-	 * This method is a generic abstraction for calls from the buffer pool.
-	 * See toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int numRows, int numCols);
-	 * 
-	 * @param rdd
-	 * @param numRows
-	 * @param numCols
-	 * @return
-	 * @throws DMLRuntimeException 
-	 */
-	@SuppressWarnings("unchecked")
-	public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz) 
-		throws DMLRuntimeException
-	{			
-		return toMatrixBlock(
-				(JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(), 
-				rlen, clen, brlen, bclen, nnz);
-	}
-	
-	/**
-	 * Utility method for creating a single matrix block out of an RDD. Note that this collect call
-	 * might trigger execution of any pending transformations. 
-	 * 
-	 * NOTE: This is an unguarded utility function, which requires memory for both the output matrix
-	 * and its collected, blocked representation.
-	 * 
-	 * @param rdd
-	 * @param numRows
-	 * @param numCols
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) 
-		throws DMLRuntimeException
-	{
-		MatrixBlock out = null;
-		
-		if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
-		{
-			//special case without copy and nnz maintenance
-			List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-			if( list.size()>1 )
-				throw new DMLRuntimeException("Expecting no more than one result block.");
-			else if( list.size()==1 )
-				out = list.get(0)._2();
-			else //empty (e.g., after ops w/ outputEmpty=false)
-				out = new MatrixBlock(rlen, clen, true);
-		}
-		else //MULTIPLE BLOCKS
-		{
-			//determine target sparse/dense representation
-			long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
-			boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
-			
-			//create output matrix block (w/ lazy allocation)
-			out = new MatrixBlock(rlen, clen, sparse);
-			List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-			
-			//copy blocks one-at-a-time into output matrix block
-			for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
-			{
-				//unpack index-block pair
-				MatrixIndexes ix = keyval._1();
-				MatrixBlock block = keyval._2();
-				
-				//compute row/column block offsets
-				int row_offset = (int)(ix.getRowIndex()-1)*brlen;
-				int col_offset = (int)(ix.getColumnIndex()-1)*bclen;
-				int rows = block.getNumRows();
-				int cols = block.getNumColumns();
-				
-				if( sparse ) { //SPARSE OUTPUT
-					//append block to sparse target in order to avoid shifting
-					//note: this append requires a final sort of sparse rows
-					out.appendToSparse(block, row_offset, col_offset);
-				}
-				else { //DENSE OUTPUT
-					out.copy( row_offset, row_offset+rows-1, 
-							  col_offset, col_offset+cols-1, block, false );	
-				}
-			}
-			
-			//post-processing output matrix
-			if( sparse )
-				out.sortSparseRows();
-			out.recomputeNonZeros();
-			out.examSparsity();
-		}
-		
-		return out;
-	}
-	
-	/**
-	 * 
-	 * @param rdd
-	 * @param rlen
-	 * @param clen
-	 * @param brlen
-	 * @param bclen
-	 * @param nnz
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public static PartitionedMatrixBlock toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz) 
-		throws DMLRuntimeException
-	{
-		PartitionedMatrixBlock out = new PartitionedMatrixBlock(rlen, clen, brlen, bclen);
-		
-		List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
-		
-		//copy blocks one-at-a-time into output matrix block
-		for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
-		{
-			//unpack index-block pair
-			MatrixIndexes ix = keyval._1();
-			MatrixBlock block = keyval._2();
-			out.setMatrixBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block);
-		}
-				
-		return out;
-	}
-	
-	/**
-	 * 
-	 * @param rdd
-	 * @param oinfo
-	 */
-	@SuppressWarnings("unchecked")
-	public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
-	{
-		JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
-		
-		//recompute nnz 
-		long nnz = SparkUtils.computeNNZFromBlocks(lrdd);
-		
-		//save file is an action which also triggers nnz maintenance
-		lrdd.saveAsHadoopFile(path, 
-				oinfo.outputKeyClass, 
-				oinfo.outputValueClass, 
-				oinfo.outputFormatClass);
-		
-		//return nnz aggregate of all blocks
-		return nnz;
-	}
-	
-	
-	/**
-	 * Returns the available memory budget for broadcast variables in bytes.
-	 * In detail, this takes into account the total executor memory as well
-	 * as relative ratios for data and shuffle. Note, that this is a conservative
-	 * estimate since both data memory and shuffle memory might not be fully
-	 * utilized. 
-	 * 
-	 * @return
-	 */
-	public static double getBroadcastMemoryBudget()
-	{
-		if( _memExecutors < 0 || _memRatioData < 0 || _memRatioShuffle < 0 )
-			analyzeSparkConfiguation();
-		
-		//70% of remaining free memory
-		double membudget = OptimizerUtils.MEM_UTIL_FACTOR *
-			              (  _memExecutors 
-			               - _memExecutors*(_memRatioData+_memRatioShuffle) );
-		
-		return membudget;
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public static double getConfiguredTotalDataMemory() {
-		return getConfiguredTotalDataMemory(false);
-	}
-	
-	/**
-	 * 
-	 * @param refresh
-	 * @return
-	 */
-	public static double getConfiguredTotalDataMemory(boolean refresh)
-	{
-		if( _memExecutors < 0 || _memRatioData < 0 )
-			analyzeSparkConfiguation();
-		
-		//always get the current num executors on refresh because this might 
-		//change if not all executors are initially allocated and it is plan-relevant
-		if( refresh && !_confOnly ) {
-			JavaSparkContext jsc = getSparkContextStatic();
-			int numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
-			return _memExecutors * _memRatioData * numExec; 
-		}
-		else
-			return ( _memExecutors * _memRatioData * _numExecutors );
-	}
-	
-	public static int getNumExecutors()
-	{
-		if( _numExecutors < 0 )
-			analyzeSparkConfiguation();
-		
-		return _numExecutors;
-	}
-	
-	public static int getDefaultParallelism() {
-		return getDefaultParallelism(false);
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public static int getDefaultParallelism(boolean refresh) 
-	{
-		if( _defaultPar < 0 && !refresh )
-			analyzeSparkConfiguation();
-		
-		//always get the current default parallelism on refresh because this might 
-		//change if not all executors are initially allocated and it is plan-relevant
-		if( refresh && !_confOnly )
-			return getSparkContextStatic().defaultParallelism();
-		else
-			return _defaultPar;
-	}
-	
-	/**
-	 * 
-	 */
-	public static void analyzeSparkConfiguation() 
-	{
-		SparkConf sconf = new SparkConf();
-		
-		//parse absolute executor memory
-		String tmp = sconf.get("spark.executor.memory", "512m");
-		if ( tmp.endsWith("g") || tmp.endsWith("G") )
-			_memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024 * 1024;
-		else if ( tmp.endsWith("m") || tmp.endsWith("M") )
-			_memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024 * 1024;
-		else if( tmp.endsWith("k") || tmp.endsWith("K") )
-			_memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-1)) * 1024;
-		else 
-			_memExecutors = Long.parseLong(tmp.substring(0,tmp.length()-2));
-		
-		//get data and shuffle memory ratios (defaults not specified in job conf)
-		_memRatioData = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
-		_memRatioShuffle = sconf.getDouble("spark.shuffle.memoryFraction", 0.2); //default 20%
-		
-		int numExecutors = sconf.getInt("spark.executor.instances", -1);
-		int numCoresPerExec = sconf.getInt("spark.executor.cores", -1);
-		int defaultPar = sconf.getInt("spark.default.parallelism", -1);
-		
-		if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) {
-			_numExecutors = numExecutors;
-			_defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec;
-			_confOnly = true;
-		}
-		else {
-			//get default parallelism (total number of executors and cores)
-			//note: spark context provides this information while conf does not
-			//(for num executors we need to correct for driver and local mode)
-			JavaSparkContext jsc = getSparkContextStatic();
-			_numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);  
-			_defaultPar = jsc.defaultParallelism();
-			_confOnly = false; //implies env info refresh w/ spark context 
-		}
-		
-		//note: required time for infrastructure analysis on 5 node cluster: ~5-20ms. 
-	}
-
-	/**
-	 * 
-	 */
-	public void checkAndRaiseValidationWarningJDKVersion()
-	{
-		//check for jdk version less than jdk8
-		boolean isLtJDK8 = InfrastructureAnalyzer.isJavaVersionLessThanJDK8();
-
-		//check multi-threaded executors
-		int numExecutors = getNumExecutors();
-		int numCores = getDefaultParallelism();
-		boolean multiThreaded = (numCores > numExecutors);
-		
-		//check for jdk version less than 8 (and raise warning if multi-threaded)
-		if( isLtJDK8 && multiThreaded) 
-		{
-			//get the jre version 
-			String version = System.getProperty("java.version");
-			
-			LOG.warn("########################################################################################");
-			LOG.warn("### WARNING: Multi-threaded text reblock may lead to thread contention on JRE < 1.8 ####");
-			LOG.warn("### java.version = " + version);
-			LOG.warn("### total number of executors = " + numExecutors);
-			LOG.warn("### total number of cores = " + numCores);
-			LOG.warn("### JDK-7032154: Performance tuning of sun.misc.FloatingDecimal/FormattedFloatingDecimal");
-			LOG.warn("### Workaround: Convert text to binary w/ changed configuration of one executor per core");
-			LOG.warn("########################################################################################");
-		}
-	}
-	
-	///////////////////////////////////////////
-	// Cleanup of RDDs and Broadcast variables
-	///////
-	
-	/**
-	 * Adds a child rdd object to the lineage of a parent rdd.
-	 * 
-	 * @param varParent
-	 * @param varChild
-	 * @throws DMLRuntimeException
-	 */
-	public void addLineageRDD(String varParent, String varChild) 
-		throws DMLRuntimeException 
-	{
-		RDDObject parent = getMatrixObject(varParent).getRDDHandle();
-		RDDObject child = getMatrixObject(varChild).getRDDHandle();
-		
-		parent.addLineageChild( child );
-	}
-	
-	/**
-	 * Adds a child broadcast object to the lineage of a parent rdd.
-	 * 
-	 * @param varParent
-	 * @param varChild
-	 * @throws DMLRuntimeException
-	 */
-	public void addLineageBroadcast(String varParent, String varChild) 
-		throws DMLRuntimeException 
-	{
-		RDDObject parent = getMatrixObject(varParent).getRDDHandle();
-		BroadcastObject child = getMatrixObject(varChild).getBroadcastHandle();
-		
-		parent.addLineageChild( child );
-	}
-	
-	@Override
-	public void cleanupMatrixObject( MatrixObject mo ) 
-		throws DMLRuntimeException
-	{
-		//NOTE: this method overwrites the default behavior of cleanupMatrixObject
-		//and hence is transparently used by rmvar instructions and other users. The
-		//core difference is the lineage-based cleanup of RDD and broadcast variables.
-		
-		try
-		{
-			if ( mo.isCleanupEnabled() ) 
-			{
-				//compute ref count only if matrix cleanup actually necessary
-				if ( !getVariables().hasReferences(mo) ) 
-				{
-					//clean cached data	
-					mo.clearData(); 
-					
-					//clean hdfs data if no pending rdd operations on it
-					if( mo.isFileExists() && mo.getFileName()!=null ) {
-						if( mo.getRDDHandle()==null ) {
-							MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName());
-						}
-						else { //deferred file removal
-							RDDObject rdd = mo.getRDDHandle();
-							rdd.setHDFSFilename(mo.getFileName());
-						}
-					}
-					
-					//cleanup RDD and broadcast variables (recursive)
-					//note: requires that mo.clearData already removed back references
-					if( mo.getRDDHandle()!=null ) { 
- 						rCleanupLineageObject(mo.getRDDHandle());
-					}	
-					if( mo.getBroadcastHandle()!=null ) {
-						rCleanupLineageObject(mo.getBroadcastHandle());
-					}
-				}
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-	}
-	
-	/**
-	 * 
-	 * @param lob
-	 * @throws IOException
-	 */
-	private void rCleanupLineageObject(LineageObject lob) 
-		throws IOException
-	{		
-		//abort recursive cleanup if still consumers
-		if( lob.getNumReferences() > 0 )
-			return;
-			
-		//abort if still reachable through matrix object (via back references for 
-		//robustness in function calls and to prevent repeated scans of the symbol table)
-		if( lob.hasBackReference() )
-			return;
-		
-		//cleanup current lineage object (from driver/executors)
-		//incl deferred hdfs file removal (only if metadata set by cleanup call)
-		if( lob instanceof RDDObject ) {
-			RDDObject rdd = (RDDObject)lob;
-			cleanupRDDVariable(rdd.getRDD());
-			if( rdd.getHDFSFilename()!=null ) { //deferred file removal
-				MapReduceTool.deleteFileWithMTDIfExistOnHDFS(rdd.getHDFSFilename());
-			}
-		}
-		else if( lob instanceof BroadcastObject ) {
-			PartitionedBroadcastMatrix pbm = ((BroadcastObject)lob).getBroadcast();
-			for( Broadcast<PartitionedMatrixBlock> bc : pbm.getBroadcasts() )
-				cleanupBroadcastVariable(bc);
-		}
-	
-		//recursively process lineage children
-		for( LineageObject c : lob.getLineageChilds() ){
-			c.decrementNumReferences();
-			rCleanupLineageObject(c);
-		}
-	}
-	
-	/**
-	 * This call destroys a broadcast variable at all executors and the driver.
-	 * Hence, it is intended to be used on rmvar only. Depending on the
-	 * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
-	 * 
-	 * 
-	 * @param inV
-	 */
-	public void cleanupBroadcastVariable(Broadcast<?> bvar) 
-	{
-		//in comparison to 'unpersist' (which would only delete the broadcast from the executors),
-		//this call also deletes related data from the driver.
-		if( bvar.isValid() ) {
-			bvar.destroy( ASYNCHRONOUS_VAR_DESTROY );
-		}
-	}
-	
-	/**
-	 * This call removes an rdd variable from executor memory and disk if required.
-	 * Hence, it is intended to be used on rmvar only. Depending on the
-	 * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
-	 * 
-	 * @param rvar
-	 */
-	public void cleanupRDDVariable(JavaPairRDD<?,?> rvar) 
-	{
-		if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
-			rvar.unpersist( ASYNCHRONOUS_VAR_DESTROY );
-		}
-	}
-	
-	/**
-	 * 
-	 * @param var
-	 * @throws DMLRuntimeException 
-	 * @throws DMLUnsupportedOperationException 
-	 */
-	@SuppressWarnings("unchecked")
-	public void repartitionAndCacheMatrixObject( String var ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		//get input rdd and default storage level
-		MatrixObject mo = getMatrixObject(var);
-		MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-				getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-		
-		//avoid unnecessary caching of input in order to reduce memory pressure
-		if( mo.getRDDHandle().allowsShortCircuitRead()
-			&& isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
-			in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
-					((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
-			
-			//investigate issue of unnecessarily large number of partitions
-			int numPartitions = CheckpointSPInstruction.getNumCoalescePartitions(mcIn, in);
-			if( numPartitions < in.partitions().size() )
-				in = in.coalesce( numPartitions );
-		}
-		
-		//repartition and persist rdd (force creation of shuffled rdd via merge)
-		JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in);
-		out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
-		   .count(); //trigger caching to prevent contention
-		
-		//create new rdd handle, in-place of current matrix object
-		RDDObject inro =  mo.getRDDHandle();       //guaranteed to exist (see above)
-		RDDObject outro = new RDDObject(out, var); //create new rdd object
-		outro.setCheckpointRDD(true);              //mark as checkpointed
-		outro.addLineageChild(inro);               //keep lineage to prevent cycles on cleanup
-		mo.setRDDHandle(outro);				       
-	}
-	
-	/**
-	 * 
-	 * @param var
-	 * @throws DMLRuntimeException
-	 * @throws DMLUnsupportedOperationException
-	 */
-	@SuppressWarnings("unchecked")
-	public void cacheMatrixObject( String var ) 
-		throws DMLRuntimeException, DMLUnsupportedOperationException
-	{
-		//get input rdd and default storage level
-		MatrixObject mo = getMatrixObject(var);
-		JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-				getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
-		
-		//persist rdd (force rdd caching)
-		in.count(); //trigger caching to prevent contention			       
-	}
-	
-	/**
-	 * 
-	 * @param poolName
-	 */
-	public void setThreadLocalSchedulerPool(String poolName) {
-		if( FAIR_SCHEDULER_MODE ) {
-			getSparkContext().sc().setLocalProperty(
-					"spark.scheduler.pool", poolName);
-		}
-	}
-	
-	/**
-	 * 
-	 */
-	public void cleanupThreadLocalSchedulerPool() {
-		if( FAIR_SCHEDULER_MODE ) {
-			getSparkContext().sc().setLocalProperty(
-					"spark.scheduler.pool", null);
-		}
-	}
-	
-	/**
-	 * 
-	 * @param rddID
-	 * @return
-	 */
-	private boolean isRDDMarkedForCaching( int rddID ) {
-		JavaSparkContext jsc = getSparkContext();
-		return jsc.sc().getPersistentRDDs().contains(rddID);
-	}
-	
-	/**
-	 * 
-	 * @param rddID
-	 * @return
-	 */
-	private boolean isRDDCached( int rddID ) {
-		//check that rdd is marked for caching
-		JavaSparkContext jsc = getSparkContext();
-		if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
-			return false;
-		}
-		
-		//check that rdd is actually already cached
-		for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
-			if( info.id() == rddID )
-				return info.isCached();
-		}
-		return false;
-	}
-	
-	///////////////////////////////////////////
-	// Debug String Handling (see explain); TODO to be removed
-	///////
-
-	/**
-	 * 
-	 * @param inst
-	 * @param outputVarName
-	 * @throws DMLRuntimeException
-	 */
-	public void setDebugString(SPInstruction inst, String outputVarName) 
-		throws DMLRuntimeException 
-	{
-		RDDObject parentLineage = getMatrixObject(outputVarName).getRDDHandle();
-		
-		if( parentLineage == null || parentLineage.getRDD() == null )
-			return;
-		
-		MLContextProxy.addRDDForInstructionForMonitoring(inst, parentLineage.getRDD().id());
-		
-		JavaPairRDD<?, ?> out = parentLineage.getRDD();
-		JavaPairRDD<?, ?> in1 = null; 
-		JavaPairRDD<?, ?> in2 = null;
-		String input1VarName = null; 
-		String input2VarName = null;
-		if(parentLineage.getLineageChilds() != null) {
-			for(LineageObject child : parentLineage.getLineageChilds()) {
-				if(child instanceof RDDObject) {
-					if(in1 == null) {
-						in1 = ((RDDObject) child).getRDD();
-						input1VarName = child.getVarName();
-					}
-					else if(in2 == null) {
-						in2 = ((RDDObject) child).getRDD();
-						input2VarName = child.getVarName();
-					}
-					else {
-						throw new DMLRuntimeException("PRINT_EXPLAIN_WITH_LINEAGE not yet supported for three outputs");
-					}
-				}
-			}
-		}
-		setLineageInfoForExplain(inst, out, in1, input1VarName, in2, input2VarName);
-	}
-	
-	// The most expensive operation here is rdd.toDebugString() which can be a major hit because
-	// of unrolling lazy evaluation of Spark. Hence, it is guarded against it along with flag 'PRINT_EXPLAIN_WITH_LINEAGE' which is 
-	// enabled only through MLContext. This way, it doesnot affect our performance evaluation through non-MLContext path
-	private void setLineageInfoForExplain(SPInstruction inst, 
-			JavaPairRDD<?, ?> out, 
-			JavaPairRDD<?, ?> in1, String in1Name, 
-			JavaPairRDD<?, ?> in2, String in2Name) throws DMLRuntimeException {
-		
-			
-		// RDDInfo outInfo = org.apache.spark.storage.RDDInfo.fromRdd(out.rdd());
-		
-		// First fetch start lines from input RDDs
-		String startLine1 = null; 
-		String startLine2 = null;
-		int i1length = 0, i2length = 0;
-		if(in1 != null) {
-			String [] lines = in1.toDebugString().split("\\r?\\n");
-			startLine1 = SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, lines[0].length());
-			i1length = lines.length;
-		}
-		if(in2 != null) {
-			String [] lines = in2.toDebugString().split("\\r?\\n");
-			startLine2 =  SparkUtils.getStartLineFromSparkDebugInfo(lines[0]); // lines[0].substring(4, lines[0].length());
-			i2length = lines.length;
-		}
-		
-		String outDebugString = "";
-		int skip = 0;
-		
-		// Now process output RDD and replace inputRDD debug string by the matrix variable name
-		String [] outLines = out.toDebugString().split("\\r?\\n");
-		for(int i = 0; i < outLines.length; i++) {
-			if(skip > 0) {
-				skip--;
-				// outDebugString += "\nSKIP:" + outLines[i];
-			}
-			else if(startLine1 != null && outLines[i].contains(startLine1)) {
-				String prefix = SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // outLines[i].substring(0, outLines[i].length() - startLine1.length());
-				outDebugString += "\n" + prefix + "[[" + in1Name + "]]";
-				//outDebugString += "\n{" + prefix + "}[[" + in1Name + "]] => " + outLines[i];
-				skip = i1length - 1;  
-			}
-			else if(startLine2 != null && outLines[i].contains(startLine2)) {
-				String prefix = SparkUtils.getPrefixFromSparkDebugInfo(outLines[i]); // outLines[i].substring(0, outLines[i].length() - startLine2.length());
-				outDebugString += "\n" + prefix + "[[" + in2Name + "]]";
-				skip = i2length - 1;
-			}
-			else {
-				outDebugString += "\n" + outLines[i];
-			}
-		}
-		
-		MLContext mlContext = MLContextProxy.getActiveMLContext();
-		if(mlContext != null && mlContext.getMonitoringUtil() != null) {
-			mlContext.getMonitoringUtil().setLineageInfo(inst, outDebugString);
-		}
-		else {
-			throw new DMLRuntimeException("The method setLineageInfoForExplain should be called only through MLContext");
-		}
-		
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java
deleted file mode 100644
index a5c05e0..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitioner.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.hops.Hop;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-
-
-/**
- * This is the base class for all data partitioner. 
- * 
- */
-public abstract class DataPartitioner 
-{	
-	
-	protected static final Log LOG = LogFactory.getLog(DataPartitioner.class.getName());
-	
-	//note: the following value has been empirically determined but might change in the future,
-	//MatrixBlockDSM.SPARCITY_TURN_POINT (with 0.4) was too high because we create 3-4 values per nnz and 
-	//have some computation overhead for binary cell.
-	protected static final double SPARSITY_CELL_THRESHOLD = 0.1d; 
-	
-	protected static final String NAME_SUFFIX = "_dp";
-	
-	//instance variables
-	protected PDataPartitionFormat _format = null;
-	protected int _n = -1; //blocksize if applicable
-	protected boolean _allowBinarycell = true;
-	
-	protected DataPartitioner( PDataPartitionFormat dpf, int n )
-	{
-		_format = dpf;
-		_n = n;
-	}
-	
-	
-	
-	/**
-	 * 
-	 * @param in
-	 * @param fnameNew
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew )
-		throws DMLRuntimeException
-	{
-		return createPartitionedMatrixObject(in, fnameNew, false);
-	}
-	
-	/**
-	 * 
-	 * @param in
-	 * @param fnameNew
-	 * @param force
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public MatrixObject createPartitionedMatrixObject( MatrixObject in, String fnameNew, boolean force )
-		throws DMLRuntimeException
-	{
-		ValueType vt = in.getValueType();
-		String varname = in.getVarName();
-		MatrixObject out = new MatrixObject(vt, fnameNew );
-		out.setDataType( DataType.MATRIX );
-		out.setVarName( varname+NAME_SUFFIX );		
-		
-		return createPartitionedMatrixObject(in, out, force);
-	}
-	
-
-	/**
-	 * Creates a partitioned matrix object based on the given input matrix object, 
-	 * according to the specified split format. The input matrix can be in-memory
-	 * or still on HDFS and the partitioned output matrix is written to HDFS. The
-	 * created matrix object can be used transparently for obtaining the full matrix
-	 * or reading 1 or multiple partitions based on given index ranges. 
-	 * 
-	 * @param in
-	 * @param force
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	public MatrixObject createPartitionedMatrixObject( MatrixObject in, MatrixObject out, boolean force )
-		throws DMLRuntimeException
-	{
-		//check for naive partitioning
-		if( _format == PDataPartitionFormat.NONE )
-			return in;
-		
-		//analyze input matrix object
-		MatrixFormatMetaData meta = (MatrixFormatMetaData)in.getMetaData();
-		MatrixCharacteristics mc = meta.getMatrixCharacteristics();
-		InputInfo ii = meta.getInputInfo();
-		OutputInfo oi = meta.getOutputInfo();
-		long rows = mc.getRows(); 
-		long cols = mc.getCols();
-		int brlen = mc.getRowsPerBlock();
-		int bclen = mc.getColsPerBlock();
-		long nonZeros = mc.getNonZeros();
-		double sparsity = (nonZeros>=0 && rows>0 && cols>0)?
-				((double)nonZeros)/(rows*cols) : 1.0;
-		
-		if( !force ) //try to optimize, if format not forced
-		{
-			//check lower bound of useful data partitioning
-			if( rows < Hop.CPThreshold && cols < Hop.CPThreshold )  //or matrix already fits in mem
-			{
-				return in;
-			}
-			
-			//check for changing to blockwise representations
-			if( _format == PDataPartitionFormat.ROW_WISE && cols < Hop.CPThreshold )
-			{
-				LOG.debug("Changing format from "+PDataPartitionFormat.ROW_WISE+" to "+PDataPartitionFormat.ROW_BLOCK_WISE+".");
-				_format = PDataPartitionFormat.ROW_BLOCK_WISE;
-			}
-			if( _format == PDataPartitionFormat.COLUMN_WISE && rows < Hop.CPThreshold )
-			{
-				LOG.debug("Changing format from "+PDataPartitionFormat.COLUMN_WISE+" to "+PDataPartitionFormat.ROW_BLOCK_WISE+".");
-				_format = PDataPartitionFormat.COLUMN_BLOCK_WISE;
-			}
-			//_format = PDataPartitionFormat.ROW_BLOCK_WISE_N;
-		}
-		
-		//check changing to binarycell in case of sparse cols (robustness)
-		boolean convertBlock2Cell = false;
-		if(    ii == InputInfo.BinaryBlockInputInfo 
-			&& _allowBinarycell
-			&& _format == PDataPartitionFormat.COLUMN_WISE	
-			&& sparsity < SPARSITY_CELL_THRESHOLD )
-		{
-			LOG.debug("Changing partition outputinfo from binaryblock to binarycell due to sparsity="+sparsity);
-			oi = OutputInfo.BinaryCellOutputInfo;
-			convertBlock2Cell = true;
-		}
-				
-		//prepare filenames and cleanup if required
-		String fnameNew = out.getFileName();
-		try{
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-		}
-		catch(Exception ex){
-			throw new DMLRuntimeException( ex );
-		}
-		
-		//core partitioning (depending on subclass)
-		partitionMatrix( in, fnameNew, ii, oi, rows, cols, brlen, bclen );
-		
-		//create output matrix object
-		out.setPartitioned( _format, _n ); 
-		
-		MatrixCharacteristics mcNew = new MatrixCharacteristics( rows, cols, (int)brlen, (int)bclen ); 
-		mcNew.setNonZeros( nonZeros );
-		if( convertBlock2Cell )
-			ii = InputInfo.BinaryCellInputInfo;
-		MatrixFormatMetaData metaNew = new MatrixFormatMetaData(mcNew,oi,ii);
-		out.setMetaData(metaNew);	 
-		
-		return out;
-		
-	}
-	
-	/**
-	 * 
-	 */
-	public void disableBinaryCell()
-	{
-		_allowBinarycell = false;
-	}
-	
-	/**
-	 * 
-	 * @param fname
-	 * @param fnameNew
-	 * @param ii
-	 * @param oi
-	 * @param rlen
-	 * @param clen
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	protected abstract void partitionMatrix( MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen )
-		throws DMLRuntimeException;
-
-	
-	public static MatrixBlock createReuseMatrixBlock( PDataPartitionFormat dpf, int rows, int cols ) 
-	{
-		MatrixBlock tmp = null;
-		
-		switch( dpf )
-		{
-			case ROW_WISE:
-				//default assumption sparse, but reset per input block anyway
-				tmp = new MatrixBlock( 1, (int)cols, true, (int)(cols*0.1) );
-				break;
-			case COLUMN_WISE:
-				//default dense because single column alwyas below SKINNY_MATRIX_TURN_POINT
-				tmp = new MatrixBlock( (int)rows, 1, false );
-				break;
-			default:
-				//do nothing
-		}
-		
-		return tmp;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java
deleted file mode 100644
index b3a5075..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerLocal.java
+++ /dev/null
@@ -1,888 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.DMLUnsupportedOperationException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.Cell;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import com.ibm.bi.dml.runtime.io.MatrixReader;
-import com.ibm.bi.dml.runtime.matrix.data.IJV;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.SparseRowsIterator;
-import com.ibm.bi.dml.runtime.util.FastStringTokenizer;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-
-/**
- * Partitions a given matrix into row or column partitions with a two pass-approach.
- * In the first phase the input matrix is read from HDFS and sorted into block partitions
- * in a staging area in the local file system according to the partition format. 
- * In order to allow for scalable partitioning, we process one block at a time. 
- * Furthermore, in the second phase, all blocks of a partition are append to a sequence file
- * on HDFS. Block-wise partitioning and write-once semantics of sequence files require the
- * indirection over the local staging area. For scalable computation, we process one 
- * sequence file at a time.
- *
- * NOTE: For the resulting partitioned matrix, we store block and cell indexes wrt partition boundaries.
- *       This means that the partitioned matrix CANNOT be read as a traditional matrix because there are
- *       for example multiple blocks with same index (while the actual index is encoded in the path).
- *       In order to enable full read of partition matrices, data converter would need to handle negative
- *       row/col offsets for partitioned read. Currently not done in order to avoid overhead from normal read
- *       and since partitioning only applied if exclusively indexed access.
- *
- *
- */
-public class DataPartitionerLocal extends DataPartitioner
-{
-	
-	private static final boolean PARALLEL = true; 
-	
-	private IDSequence _seq = null;
-	private MatrixBlock _reuseBlk = null;
-	
-	private int _par = -1;
-	
-	/**
-	 * 
-	 * @param dpf
-	 * @param n
-	 * @param par -1 for serial otherwise number of threads, can be ignored by implementation
-	 * @throws DMLRuntimeException
-	 */
-	public DataPartitionerLocal(PDataPartitionFormat dpf, int n, int par) 
-		throws DMLRuntimeException 
-	{
-		super(dpf, n);
-		
-		//TODO
-		if( dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N || dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N  )
-			throw new DMLRuntimeException("Data partitioning formt '"+dpf+"' not supported by DataPartitionerLocal" );
-		
-		_seq = new IDSequence();
-		_par = (par > 0) ? par : 1;
-	}
-	
-	@Override
-	protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen)
-			throws DMLRuntimeException 
-	{
-		//force writing to disk (typically not required since partitioning only applied if dataset exceeds CP size)
-		in.exportData(); //written to disk iff dirty
-		
-		String fname = in.getFileName();
-		String fnameStaging = LocalFileUtils.getUniqueWorkingDir( LocalFileUtils.CATEGORY_PARTITIONING );
-		
-		//reblock input matrix
-		if( ii == InputInfo.TextCellInputInfo )
-			partitionTextCell( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
-		else if( ii == InputInfo.BinaryCellInputInfo )
-			partitionBinaryCell( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
-		else if( ii == InputInfo.BinaryBlockInputInfo )
-		{
-			if( oi == OutputInfo.BinaryBlockOutputInfo )
-				partitionBinaryBlock( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
-			else if ( oi == OutputInfo.BinaryCellOutputInfo )
-				partitionBinaryBlock2BinaryCell( fname, fnameStaging, fnameNew, rlen, clen, brlen, bclen );
-		}
-		else	
-			throw new DMLRuntimeException("Cannot create data partitions of format: "+ii.toString());
-	
-		LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
-	}
-
-
-
-
-	/**
-	 * 
-	 * @param fname
-	 * @param fnameStaging
-	 * @param fnameNew
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	private void partitionTextCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) 
-		throws DMLRuntimeException
-	{
-		long row = -1;
-		long col = -1;
-		
-		try 
-		{
-			//STEP 1: read matrix from HDFS and write blocks to local staging area			
-			//check and add input path
-			JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-			Path path = new Path(fname);
-			FileInputFormat.addInputPath(job, path);
-			TextInputFormat informat = new TextInputFormat();
-			informat.configure(job);
-			InputSplit[] splits = informat.getSplits(job, 1);
-			
-			LinkedList<Cell> buffer = new LinkedList<Cell>();
-			LongWritable key = new LongWritable();
-			Text value = new Text();
-			FastStringTokenizer st = new FastStringTokenizer(' ');
-			
-			for(InputSplit split: splits)
-			{
-				RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
-				try
-				{
-					while(reader.next(key, value))
-					{
-						st.reset( value.toString() ); //reset tokenizer
-						row = st.nextLong();
-						col = st.nextLong();
-						double lvalue = st.nextDouble();
-						Cell tmp = new Cell( row, col, lvalue ); 
-		
-						buffer.addLast( tmp );
-						if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
-						{
-							appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
-							buffer.clear();
-						}
-					}
-					
-					//final flush
-					if( !buffer.isEmpty() )
-					{
-						appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
-						buffer.clear();
-					}
-				}
-				finally
-				{
-					if( reader != null )
-						reader.close();
-				}
-			}
-
-			//STEP 2: read matrix blocks from staging area and write matrix to HDFS
-			String[] fnamesPartitions = new File(fnameStaging).list();	
-			if(PARALLEL) 
-			{
-				int len = Math.min(fnamesPartitions.length, _par);
-				Thread[] threads = new Thread[len];
-				for( int i=0;i<len;i++ )
-				{
-					int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
-					int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
-					end = Math.min(end, fnamesPartitions.length-1);
-					threads[i] = new Thread(new DataPartitionerWorkerTextCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
-					threads[i].start();
-				}
-				
-				for( Thread t : threads )
-					t.join();
-			}
-			else
-			{
-				for( String pdir : fnamesPartitions  )
-					writeTextCellFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );	
-			}
-		} 
-		catch (Exception e) 
-		{
-			//post-mortem error handling and bounds checking
-			if( row < 1 || row > rlen || col < 1 || col > clen )
-			{
-				throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
-									          "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
-			}
-			else
-				throw new DMLRuntimeException("Unable to partition text cell matrix.", e);
-		}
-	}	
-
-	/**
-	 * 
-	 * @param fname
-	 * @param fnameStaging
-	 * @param fnameNew
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void partitionBinaryCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) 
-		throws DMLRuntimeException
-	{
-		long row = -1;
-		long col = -1;
-		
-		try 
-		{
-			//STEP 1: read matrix from HDFS and write blocks to local staging area
-			//check and add input path
-			JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-			Path path = new Path(fname);
-			FileSystem fs = FileSystem.get(job);
-			
-			//prepare sequence file reader, and write to local staging area	
-			LinkedList<Cell> buffer = new LinkedList<Cell>();
-			MatrixIndexes key = new MatrixIndexes();
-			MatrixCell value = new MatrixCell();
-	
-			for( Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
-			{
-				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
-				try
-				{
-					while(reader.next(key, value))
-					{
-						row = key.getRowIndex();
-						col = key.getColumnIndex();
-						Cell tmp = new Cell( row, col, value.getValue() ); 
-		
-						buffer.addLast( tmp );
-						if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
-						{
-							appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
-							buffer.clear();
-						}
-					}
-					
-					//final flush
-					if( !buffer.isEmpty() )
-					{
-						appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
-						buffer.clear();
-					}
-				}
-				finally
-				{
-					if( reader != null )
-						reader.close();
-				}
-			}
-			
-			//STEP 2: read matrix blocks from staging area and write matrix to HDFS
-			String[] fnamesPartitions = new File(fnameStaging).list();			
-			if(PARALLEL) 
-			{
-				int len = Math.min(fnamesPartitions.length, _par);
-				Thread[] threads = new Thread[len];
-				for( int i=0;i<len;i++ )
-				{
-					int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
-					int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
-					end = Math.min(end, fnamesPartitions.length-1);
-					threads[i] = new Thread(new DataPartitionerWorkerBinaryCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
-					threads[i].start();
-				}
-				
-				for( Thread t : threads )
-					t.join();
-			}
-			else
-			{
-				for( String pdir : fnamesPartitions  )
-					writeBinaryCellSequenceFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );	
-			}
-		} 
-		catch (Exception e) 
-		{
-			//post-mortem error handling and bounds checking
-			if( row < 1 || row > rlen || col < 1 || col > clen )
-			{
-				throw new DMLRuntimeException("Matrix cell ["+(row)+","+(col)+"] " +
-									  		  "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
-			}
-			else
-				throw new DMLRuntimeException("Unable to partition binary cell matrix.", e);
-		}
-	}	
-	
-	/**
-	 * 
-	 * @param fname
-	 * @param fnameStaging
-	 * @param fnameNew
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void partitionBinaryBlock( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) 
-		throws DMLRuntimeException
-	{
-		try 
-		{	
-			//create reuse object
-			_reuseBlk = DataPartitioner.createReuseMatrixBlock(_format, brlen, bclen);
-			
-			//STEP 1: read matrix from HDFS and write blocks to local staging area	
-			//check and add input path
-			JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-			Path path = new Path(fname);
-			FileSystem fs = FileSystem.get(job);
-			
-			//prepare sequence file reader, and write to local staging area
-			MatrixIndexes key = new MatrixIndexes(); 
-			MatrixBlock value = new MatrixBlock();
-			
-			for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
-			{
-				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
-				try
-				{
-					while(reader.next(key, value)) //for each block
-					{
-						long row_offset = (key.getRowIndex()-1)*brlen;
-						long col_offset = (key.getColumnIndex()-1)*bclen;
-						long rows = value.getNumRows();
-						long cols = value.getNumColumns();
-						
-						//bound check per block
-						if( row_offset + rows < 1 || row_offset + rows > rlen || col_offset + cols<1 || col_offset + cols > clen )
-						{
-							throw new IOException("Matrix block ["+(row_offset+1)+":"+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
-									              "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
-						}
-						
-					    appendBlockToStagingArea(fnameStaging, value, row_offset, col_offset, brlen, bclen);
-					}
-				}
-				finally
-				{
-					if( reader != null )
-						reader.close();
-				}
-			}
-
-			//STEP 2: read matrix blocks from staging area and write matrix to HDFS
-			String[] fnamesPartitions = new File(fnameStaging).list();			
-			if(PARALLEL) 
-			{
-				int len = Math.min(fnamesPartitions.length, _par);
-				Thread[] threads = new Thread[len];
-				for( int i=0;i<len;i++ )
-				{
-					int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
-					int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
-					end = Math.min(end, fnamesPartitions.length-1);
-					threads[i] = new Thread(new DataPartitionerWorkerBinaryBlock(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
-					threads[i].start();
-				}
-				
-				for( Thread t : threads )
-					t.join();
-			}
-			else
-			{
-				for( String pdir : fnamesPartitions  )
-					writeBinaryBlockSequenceFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir, false );		
-			}
-		} 
-		catch (Exception e) 
-		{
-			throw new DMLRuntimeException("Unable to partition binary block matrix.", e);
-		}
-	}
-
-	/**
-	 * 
-	 * @param fname
-	 * @param fnameStaging
-	 * @param fnameNew
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void partitionBinaryBlock2BinaryCell( String fname, String fnameStaging, String fnameNew, long rlen, long clen, int brlen, int bclen ) 
-		throws DMLRuntimeException
-	{
-		try 
-		{		
-			//STEP 1: read matrix from HDFS and write blocks to local staging area	
-			//check and add input path
-			JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-			Path path = new Path(fname);
-			FileSystem fs = FileSystem.get(job);
-			
-			//prepare sequence file reader, and write to local staging area
-			MatrixIndexes key = new MatrixIndexes(); 
-			MatrixBlock value = new MatrixBlock();
-			
-			LinkedList<Cell> buffer = new LinkedList<Cell>();
-			
-			for(Path lpath : MatrixReader.getSequenceFilePaths(fs, path) )
-			{
-				SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
-				try
-				{
-					while(reader.next(key, value)) //for each block
-					{
-						long row_offset = (key.getRowIndex()-1)*brlen;
-						long col_offset = (key.getColumnIndex()-1)*bclen;
-						long rows = value.getNumRows();
-						long cols = value.getNumColumns();
-						
-						//bound check per block
-						if( row_offset + rows < 1 || row_offset + rows > rlen || col_offset + cols<1 || col_offset + cols > clen )
-						{
-							throw new IOException("Matrix block ["+(row_offset+1)+":"+(row_offset+rows)+","+(col_offset+1)+":"+(col_offset+cols)+"] " +
-									              "out of overall matrix range [1:"+rlen+",1:"+clen+"].");
-						}
-						
-						boolean sparse = value.isInSparseFormat();
-						if( sparse ) //SPARSE
-						{
-							SparseRowsIterator iter = value.getSparseRowsIterator();
-							while( iter.hasNext() )
-							{
-								IJV lcell = iter.next();
-								Cell tmp = new Cell( row_offset + lcell.i + 1, 
-													 col_offset + lcell.j + 1,
-													 lcell.v ); 
-								buffer.addLast( tmp );
-							}
-						}
-						else //DENSE
-						{
-							for( int i=0; i<rows; i++ )
-								for( int j=0; j<cols; j++ )
-								{
-									double lvalue  = value.getValueDenseUnsafe(i, j);
-									if( lvalue != 0 ) //for nnz
-									{
-										Cell tmp = new Cell( row_offset + i + 1, 
-												 			 col_offset + j + 1,
-												 			 lvalue ); 
-										buffer.addLast( tmp );
-									}
-								}
-						}
-						
-						appendCellBufferToStagingArea(fnameStaging, buffer, brlen, bclen);
-						buffer.clear();
-					}
-				}
-				finally
-				{
-					if( reader != null )
-						reader.close();
-				}
-			}
-
-			//STEP 2: read matrix blocks from staging area and write matrix to HDFS
-			String[] fnamesPartitions = new File(fnameStaging).list();			
-			if(PARALLEL) 
-			{
-				int len = Math.min(fnamesPartitions.length, _par);
-				Thread[] threads = new Thread[len];
-				for( int i=0;i<len;i++ )
-				{
-					int start = i*(int)Math.ceil(((double)fnamesPartitions.length)/len);
-					int end = (i+1)*(int)Math.ceil(((double)fnamesPartitions.length)/len)-1;
-					end = Math.min(end, fnamesPartitions.length-1);
-					threads[i] = new Thread(new DataPartitionerWorkerBinaryCell(job, fnameNew, fnameStaging, fnamesPartitions, start, end));
-					threads[i].start();
-				}
-				
-				for( Thread t : threads )
-					t.join();
-			}
-			else
-			{
-				for( String pdir : fnamesPartitions  )
-					writeBinaryCellSequenceFileToHDFS( job, fnameNew, fnameStaging+"/"+pdir );	
-			}
-		} 
-		catch (Exception e) 
-		{
-			throw new DMLRuntimeException("Unable to partition binary block matrix.", e);
-		}
-	}
-
-	
-	/**
-	 * 
-	 * @param dir
-	 * @param mb
-	 * @param row_offset
-	 * @param col_offset
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 * @throws DMLUnsupportedOperationException 
-	 */
-	private void appendBlockToStagingArea( String dir, MatrixBlock mb, long row_offset, long col_offset, long brlen, long bclen ) 
-		throws DMLRuntimeException, IOException, DMLUnsupportedOperationException
-	{
-		//NOTE: for temporary block we always create dense representations
-		boolean sparse = mb.isInSparseFormat();
-		long nnz = mb.getNonZeros();
-		long rows = mb.getNumRows();
-		long cols = mb.getNumColumns();
-		double sparsity = ((double)nnz)/(rows*cols);
-
-		if( _format == PDataPartitionFormat.ROW_WISE ) 
-		{	
-			_reuseBlk.reset( 1, (int)cols, sparse, (int) (cols*sparsity) ); 			
-			for( int i=0; i<rows; i++ )
-			{
-				String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(row_offset+1+i));
-				String pfname = pdir+"/"+"block_"+(col_offset/bclen+1);
-				mb.sliceOperations(i, i, 0, (int)(cols-1), _reuseBlk);
-				LocalFileUtils.writeMatrixBlockToLocal(pfname, _reuseBlk);
-				_reuseBlk.reset();
-			}
-		}
-		else if( _format == PDataPartitionFormat.ROW_BLOCK_WISE )
-		{
-			String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(row_offset/brlen+1));
-			String pfname = pdir+"/"+"block_"+(col_offset/bclen+1);
-			LocalFileUtils.writeMatrixBlockToLocal(pfname, mb);
-		}
-		else if( _format == PDataPartitionFormat.COLUMN_WISE )
-		{
-			//create object for reuse
-			_reuseBlk.reset( (int)rows, 1, false );
-			
-			for( int i=0; i<cols; i++ )
-			{
-				String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(col_offset+1+i));
-				String pfname = pdir+"/"+"block_"+(row_offset/brlen+1); 			
-				mb.sliceOperations(0, (int)(rows-1), i, i, _reuseBlk);
-				LocalFileUtils.writeMatrixBlockToLocal(pfname, _reuseBlk);
-				_reuseBlk.reset();
-			}				
-		}
-		else if( _format == PDataPartitionFormat.COLUMN_BLOCK_WISE )
-		{
-			String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+(col_offset/bclen+1));
-			String pfname = pdir+"/"+"block_"+(row_offset/brlen+1);
-			LocalFileUtils.writeMatrixBlockToLocal(pfname, mb);
-		}
-	}
-	
-	/**
-	 * 
-	 * @param dir
-	 * @param buffer
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	private void appendCellBufferToStagingArea( String dir, LinkedList<Cell> buffer, int brlen, int bclen ) 
-		throws DMLRuntimeException, IOException
-	{
-		HashMap<Long,LinkedList<Cell>> sortedBuffer = new HashMap<Long,LinkedList<Cell>>();
-		
-		//sort cells in buffer wrt key
-		long key = -1;
-		for( Cell c : buffer )
-		{
-			switch(_format)
-			{
-				case ROW_WISE:
-					key = c.getRow();
-					c.setRow(1);
-					break;
-				case ROW_BLOCK_WISE:
-					key = (c.getRow()-1)/brlen+1;
-					c.setRow((c.getRow()-1)%brlen+1);
-					break;
-				case COLUMN_WISE:
-					key = c.getCol();
-					c.setCol(1);
-					break;
-				case COLUMN_BLOCK_WISE:
-					key = (c.getCol()-1)/bclen+1;
-					c.setCol((c.getCol()-1)%bclen+1);
-					break;
-				default:
-					//do nothing
-			}
-			
-			if( !sortedBuffer.containsKey(key) )
-				sortedBuffer.put(key, new LinkedList<Cell>());
-			sortedBuffer.get(key).addLast(c);
-		}	
-		
-		//write lists of cells to local files
-		for( Entry<Long,LinkedList<Cell>> e : sortedBuffer.entrySet() )
-		{
-			String pdir = LocalFileUtils.checkAndCreateStagingDir(dir+"/"+e.getKey());
-			String pfname = pdir+"/"+"block_"+_seq.getNextID();
-			StagingFileUtils.writeCellListToLocal(pfname, e.getValue());
-		}
-	}	
-
-	
-	/////////////////////////////////////
-	//     Helper methods for HDFS     //
-	// read/write in different formats //
-	/////////////////////////////////////
-	
-	@SuppressWarnings("deprecation")
-	public void writeBinaryBlockSequenceFileToHDFS( JobConf job, String dir, String lpdir, boolean threadsafe ) 
-		throws IOException
-	{
-		long key = getKeyFromFilePath(lpdir);
-		FileSystem fs = FileSystem.get(job);
-		Path path =  new Path(dir+"/"+key);
-		SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
-
-		try
-		{
-			String[] fnameBlocks = new File( lpdir ).list();
-			for( String fnameBlock : fnameBlocks  )
-			{
-				long key2 = getKey2FromFileName(fnameBlock);
-				MatrixBlock tmp = null;
-				if( threadsafe )
-					tmp = LocalFileUtils.readMatrixBlockFromLocal(lpdir+"/"+fnameBlock);
-				else
-					tmp = LocalFileUtils.readMatrixBlockFromLocal(lpdir+"/"+fnameBlock, _reuseBlk);
-				
-				if( _format == PDataPartitionFormat.ROW_WISE || _format == PDataPartitionFormat.ROW_BLOCK_WISE )
-				{
-					writer.append(new MatrixIndexes(1,key2), tmp);
-				}
-				else if( _format == PDataPartitionFormat.COLUMN_WISE || _format == PDataPartitionFormat.COLUMN_BLOCK_WISE )
-				{
-					writer.append(new MatrixIndexes(key2,1), tmp);
-				}
-			}
-		}
-		finally
-		{
-			if( writer != null )
-				writer.close();
-		}
-	}
-	
-	@SuppressWarnings("deprecation")
-	public void writeBinaryCellSequenceFileToHDFS( JobConf job, String dir, String lpdir ) 
-		throws IOException
-	{
-		long key = getKeyFromFilePath(lpdir);
-		FileSystem fs = FileSystem.get(job);
-		Path path =  new Path(dir+"/"+key);
-		SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-	
-		try
-		{
-			MatrixIndexes indexes = new MatrixIndexes();
-			MatrixCell cell = new MatrixCell();
-			
-			String[] fnameBlocks = new File( lpdir ).list();
-			for( String fnameBlock : fnameBlocks  )
-			{
-				LinkedList<Cell> tmp = StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
-				for( Cell c : tmp )
-				{
-					indexes.setIndexes(c.getRow(), c.getCol());
-					cell.setValue(c.getValue());
-					writer.append(indexes, cell);
-				}
-			}
-		}
-		finally
-		{
-			if( writer != null )
-				writer.close();
-		}
-	}
-	
-	public void writeTextCellFileToHDFS( JobConf job, String dir, String lpdir ) 
-		throws IOException
-	{
-		long key = getKeyFromFilePath(lpdir);
-		FileSystem fs = FileSystem.get(job);
-		Path path = new Path(dir+"/"+key);
-		BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
-		try
-		{
-			//for obj reuse and preventing repeated buffer re-allocations
-			StringBuilder sb = new StringBuilder();
-			
-			String[] fnameBlocks = new File( lpdir ).list();
-			for( String fnameBlock : fnameBlocks  )
-			{
-				LinkedList<Cell> tmp = StagingFileUtils.readCellListFromLocal(lpdir+"/"+fnameBlock);
-				for( Cell c : tmp )
-				{
-					sb.append(c.getRow());
-					sb.append(' ');
-					sb.append(c.getCol());
-					sb.append(' ');
-					sb.append(c.getValue());
-					sb.append('\n');
-					out.write( sb.toString() );		
-					sb.setLength(0);
-				}
-			}
-		}
-		finally
-		{
-			if( out != null )
-				out.close();
-		}
-	}
-	
-	
-	/////////////////////////////////
-	// Helper methods for local fs //
-	//         read/write          //
-	/////////////////////////////////
-	
-	/**
-	 * 
-	 * @param dir
-	 * @return
-	 */
-	private long getKeyFromFilePath( String dir )
-	{
-		String[] dirparts = dir.split("/");
-		long key = Long.parseLong( dirparts[dirparts.length-1] );
-		return key;
-	}
-	
-	/**
-	 * 
-	 * @param fname
-	 * @return
-	 */
-	private long getKey2FromFileName( String fname )
-	{
-		return Long.parseLong( fname.split("_")[1] );
-	}
-
-	private abstract class DataPartitionerWorker implements Runnable
-	{
-		private JobConf _job = null;
-		private String _fnameNew = null;
-		private String _fnameStaging = null;
-		private String[] _fnamesPartitions = null;
-		private int _start = -1;
-		private int _end = -1;
-		
-		public DataPartitionerWorker(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end)
-		{
-			_job = job;
-			_fnameNew = fnameNew;
-			_fnameStaging = fnameStaging;
-			_fnamesPartitions = fnamesPartitions;
-			_start = start;
-			_end = end;
-		}
-
-		@Override
-		public void run() 
-		{
-			//read each input if required
-			try
-			{
-				for( int i=_start; i<=_end; i++ )
-				{
-					String pdir = _fnamesPartitions[i];
-					writeFileToHDFS( _job, _fnameNew, _fnameStaging+"/"+pdir );	
-				}
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException("Failed on parallel data partitioning.", ex);
-			}
-		}
-		
-		public abstract void writeFileToHDFS( JobConf job, String fnameNew, String stagingDir ) 
-			throws IOException;
-	}
-	
-	private class DataPartitionerWorkerTextCell extends DataPartitionerWorker
-	{
-		public DataPartitionerWorkerTextCell(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) 
-		{
-			super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
-		}
-
-		@Override
-		public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir) 
-			throws IOException 
-		{
-			writeTextCellFileToHDFS( job, fnameNew, stagingDir );	
-		}	
-	}	
-	
-	private class DataPartitionerWorkerBinaryCell extends DataPartitionerWorker
-	{
-		public DataPartitionerWorkerBinaryCell(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) 
-		{
-			super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
-		}
-
-		@Override
-		public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir) 
-			throws IOException 
-		{
-			writeBinaryCellSequenceFileToHDFS( job, fnameNew, stagingDir );	
-		}	
-	}
-	
-	private class DataPartitionerWorkerBinaryBlock extends DataPartitionerWorker
-	{
-		public DataPartitionerWorkerBinaryBlock(JobConf job, String fnameNew, String fnameStaging, String[] fnamesPartitions, int start, int end) 
-		{
-			super(job, fnameNew, fnameStaging, fnamesPartitions, start, end);
-		}
-
-		@Override
-		public void writeFileToHDFS(JobConf job, String fnameNew, String stagingDir) 
-			throws IOException 
-		{
-			writeBinaryBlockSequenceFileToHDFS( job, fnameNew, stagingDir, true );	
-		}	
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
deleted file mode 100644
index d0cb7fc..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/DataPartitionerRemoteMR.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.conf.DMLConfig;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.PairWritableBlock;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.PairWritableCell;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.utils.Statistics;
-import com.ibm.bi.dml.yarn.DMLAppMasterUtils;
-
-/**
- * MR job class for submitting parfor remote partitioning MR jobs.
- *
- */
-public class DataPartitionerRemoteMR extends DataPartitioner
-{	
-	
-	private long _pfid = -1;
-	private int  _numReducers = -1;
-	private int  _replication = -1;
-	//private int  _max_retry = -1;
-	private boolean _jvmReuse = false;
-	private boolean _keepIndexes = false;
-	
-	
-	public DataPartitionerRemoteMR(PDataPartitionFormat dpf, int n, long pfid, int numReducers, int replication, int max_retry, boolean jvmReuse, boolean keepIndexes) 
-	{
-		super(dpf, n);
-		
-		_pfid = pfid;
-		_numReducers = numReducers;
-		_replication = replication;
-		//_max_retry = max_retry;
-		_jvmReuse = jvmReuse;
-		_keepIndexes = keepIndexes;
-	}
-
-
-	@Override
-	protected void partitionMatrix(MatrixObject in, String fnameNew, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen)
-			throws DMLRuntimeException 
-	{
-		String jobname = "ParFor-DPMR";
-		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-		
-		JobConf job;
-		job = new JobConf( DataPartitionerRemoteMR.class );
-		if( _pfid >= 0 ) //use in parfor
-			job.setJobName(jobname+_pfid);
-		else //use for partition instruction
-			job.setJobName("Partition-MR");
-			
-		//maintain dml script counters
-		Statistics.incrementNoOfCompiledMRJobs();
-		
-		try
-		{
-			//force writing to disk (typically not required since partitioning only applied if dataset exceeds CP size)
-			in.exportData(); //written to disk iff dirty
-			
-			Path path = new Path(in.getFileName());
-			
-			/////
-			//configure the MR job
-			MRJobConfiguration.setPartitioningInfo(job, rlen, clen, brlen, bclen, ii, oi, _format, _n, fnameNew, _keepIndexes);
-			
-			//set mappers, reducers, combiners
-			job.setMapperClass(DataPartitionerRemoteMapper.class); 
-			job.setReducerClass(DataPartitionerRemoteReducer.class);
-			
-			if( oi == OutputInfo.TextCellOutputInfo )
-			{
-				//binary cell intermediates for reduced IO 
-				job.setMapOutputKeyClass(LongWritable.class);
-				job.setMapOutputValueClass(PairWritableCell.class);	
-			}
-			else if( oi == OutputInfo.BinaryCellOutputInfo )
-			{
-				job.setMapOutputKeyClass(LongWritable.class);
-				job.setMapOutputValueClass(PairWritableCell.class);
-			}
-			else if ( oi == OutputInfo.BinaryBlockOutputInfo )
-			{
-				job.setMapOutputKeyClass(LongWritable.class);
-				job.setMapOutputValueClass(PairWritableBlock.class);
-				
-				//check Alignment
-				if(   (_format == PDataPartitionFormat.ROW_BLOCK_WISE_N && rlen>_n && _n % brlen !=0)
-        			|| (_format == PDataPartitionFormat.COLUMN_BLOCK_WISE_N && clen>_n && _n % bclen !=0) )
-				{
-					throw new DMLRuntimeException("Data partitioning format "+_format+" requires aligned blocks.");
-				}
-			}
-			
-			//set input format 
-			job.setInputFormat(ii.inputFormatClass);
-			
-			//set the input path and output path 
-		    FileInputFormat.setInputPaths(job, path);
-			
-		    //set output path
-		    MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-		    //FileOutputFormat.setOutputPath(job, pathNew);
-		    job.setOutputFormat(NullOutputFormat.class);
-
-			//////
-			//set optimization parameters
-
-			//set the number of mappers and reducers 
-		    //job.setNumMapTasks( _numMappers ); //use default num mappers
-		    long reducerGroups = -1;
-		    switch( _format )
-		    {
-			    case ROW_WISE: reducerGroups = rlen; break;
-			    case COLUMN_WISE: reducerGroups = clen; break;
-			    case ROW_BLOCK_WISE: reducerGroups = (rlen/brlen)+((rlen%brlen==0)?0:1); break;
-			    case COLUMN_BLOCK_WISE: reducerGroups = (clen/bclen)+((clen%bclen==0)?0:1); break;
-			    case ROW_BLOCK_WISE_N: reducerGroups = (rlen/_n)+((rlen%_n==0)?0:1); break;
-			    case COLUMN_BLOCK_WISE_N: reducerGroups = (clen/_n)+((clen%_n==0)?0:1); break;
-			    default:
-					//do nothing
-		    }
-		    job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) ); 	
-
-			//use FLEX scheduler configuration properties
-			/*if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
-			{
-				job.setInt("flex.map.min", 0);
-				job.setInt("flex.map.max", _numMappers);
-				job.setInt("flex.reduce.min", 0);
-				job.setInt("flex.reduce.max", _numMappers);
-			}*/
-			
-			//disable automatic tasks timeouts and speculative task exec
-			job.setInt("mapred.task.timeout", 0);			
-			job.setMapSpeculativeExecution(false);
-			
-			//set up preferred custom serialization framework for binary block format
-			if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-				MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-			
-			//enables the reuse of JVMs (multiple tasks per MR task)
-			if( _jvmReuse )
-				job.setNumTasksToExecutePerJvm(-1); //unlimited
-			
-			//enables compression - not conclusive for different codecs (empirically good compression ratio, but significantly slower)
-			//job.set("mapred.compress.map.output", "true");
-			//job.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
-			
-			//set the replication factor for the results
-			job.setInt("dfs.replication", _replication);
-			
-			//set up map/reduce memory configurations (if in AM context)
-			DMLConfig config = ConfigurationManager.getConfig();
-			DMLAppMasterUtils.setupMRJobRemoteMaxMemory(job, config);
-			
-			//set the max number of retries per map task
-			//  disabled job-level configuration to respect cluster configuration
-			//  note: this refers to hadoop2, hence it never had effect on mr1
-			//job.setInt("mapreduce.map.maxattempts", _max_retry);
-			
-			//set unique working dir
-			MRJobConfiguration.setUniqueWorkingDir(job);
-			
-			/////
-			// execute the MR job	
-			JobClient.runJob(job);
-		
-			//maintain dml script counters
-			Statistics.incrementNoOfExecutedMRJobs();
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		
-		if( DMLScript.STATISTICS && _pfid >= 0 ){ 
-			long t1 = System.nanoTime(); //only for parfor 
-			Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, t1-t0);
-		}
-	}
-	
-}