You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:33 UTC

[02/78] [abbrv] [partial] incubator-systemml git commit: Move files to new package folder structure

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java
deleted file mode 100644
index d93949f..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemotePartitioning.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-
-public class ResultMergeRemotePartitioning implements Partitioner<ResultMergeTaggedMatrixIndexes, TaggedMatrixBlock> 
-{
-	
-	
-	private long _numColBlocks = -1;
-	
-	
-    @Override
-    public int getPartition(ResultMergeTaggedMatrixIndexes key, TaggedMatrixBlock val, int numPartitions) 
-    {
-    	//MB: Result merge might deal with lots of data but only few
-    	//different indexes (many worker result blocks for one final
-    	//result block). Hence, balanced partitioning it even more important
-    	//and unfortunately, our default hash function results in significant
-    	//load imbalance for those cases. Based on the known result dimensions
-    	//we can create a better partitioning scheme. However, it still makes
-    	//the assumption that there is no sparsity skew between blocks.
-    	
-    	MatrixIndexes ix = key.getIndexes();
-    	int blockid = (int) (ix.getRowIndex() * _numColBlocks + ix.getColumnIndex());
-    	int partition = blockid % numPartitions;
-    	
-        //int hash = key.getIndexes().hashCode();
-        //int partition = hash % numPartitions;
-        
-    	return partition;
-    }
-
-	@Override
-	public void configure(JobConf job) 
-	{
-		long[] tmp = MRJobConfiguration.getResultMergeMatrixCharacteristics( job );
-		long clen = tmp[1]; 
-		int bclen = (int) tmp[3];
-		_numColBlocks = clen/bclen + ((clen%bclen!=0)? 1 : 0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
deleted file mode 100644
index 5066a82..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteReducer.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixCell;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-
-/**
- * Remote result merge reducer that receives all worker results partitioned by
- * cell index or blockindex and merges all results. Due to missing resettable iterators
- * in the old mapred API we need to spill parts of the value list to disk before merging
- * in case of binaryblock.
- *
- */
-public class ResultMergeRemoteReducer 
-	implements Reducer<Writable, Writable, Writable, Writable>
-{	
-	
-	private ResultMergeReducer _reducer = null;
-	
-	public ResultMergeRemoteReducer( ) 
-	{
-		
-	}
-	
-	@Override
-	public void reduce(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
-		throws IOException 
-	{
-		_reducer.processKeyValueList(key, valueList, out, reporter);
-	}
-
-	/**
-	 * 
-	 */
-	public void configure(JobConf job)
-	{
-		InputInfo ii = MRJobConfiguration.getResultMergeInputInfo(job);
-		String compareFname = MRJobConfiguration.getResultMergeInfoCompareFilename(job);
-		
-		//determine compare required
-		boolean requiresCompare = false;
-		if( !compareFname.equals("null") )
-			requiresCompare = true;
-		
-		if( ii == InputInfo.TextCellInputInfo )
-			_reducer = new ResultMergeReducerTextCell(requiresCompare);
-		else if( ii == InputInfo.BinaryCellInputInfo )
-			_reducer = new ResultMergeReducerBinaryCell(requiresCompare);
-		else if( ii == InputInfo.BinaryBlockInputInfo )
-			_reducer = new ResultMergeReducerBinaryBlock(requiresCompare);
-		else
-			throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString());
-	}
-	
-	/**
-	 * 
-	 */
-	@Override
-	public void close() throws IOException 
-	{
-		//do nothing
-	}
-
-	
-	private interface ResultMergeReducer //interface in order to allow ResultMergeReducerBinaryBlock to inherit from ResultMerge
-	{	
-		void processKeyValueList( Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter ) 
-			throws IOException;
-	}
-	
-	private static class ResultMergeReducerTextCell implements ResultMergeReducer
-	{
-		private boolean _requiresCompare;
-		private StringBuilder _sb = null;
-		private Text _objValue = null;
-		
-		public ResultMergeReducerTextCell(boolean requiresCompare)
-		{
-			_requiresCompare = requiresCompare;
-			_sb = new StringBuilder();
-			_objValue = new Text();
-		}
-		
-		@Override
-		public void processKeyValueList(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
-			throws IOException 
-		{
-			//with compare
-			if( _requiresCompare )
-			{
-				// NOTES MB:
-				// 1) the old mapred api does not support multiple scans (reset/mark),
-				//    once we switch to the new api, we can use the resetableiterator for doing
-				//    the two required scans (finding the compare obj, compare all other objs)
-				// 2) for 'textcell' we assume that the entire valueList fits into main memory
-				//    this is valid as we group by cells, i.e., we would need millions of input files
-				//    to exceed the usual 100-600MB per reduce task.
-				
-				//scan for compare object (incl result merge if compare available)
-				MatrixIndexes key2 = (MatrixIndexes) key;
-				Double cellCompare = null;
-				Collection<Double> cellList = new LinkedList<Double>();
-				boolean found = false;
-				while( valueList.hasNext() ) {
-					TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next();
-					double lvalue = ((MatrixCell)tVal.getBaseObject()).getValue();
-					if( tVal.getTag()==ResultMergeRemoteMR.COMPARE_TAG )
-						cellCompare = lvalue;
-					else 
-					{
-						if( cellCompare == null )
-							cellList.add( lvalue );
-						else if( cellCompare.doubleValue()!=lvalue ) //compare on the fly
-						{
-							_sb.append(key2.getRowIndex());
-							_sb.append(' ');
-							_sb.append(key2.getColumnIndex());
-							_sb.append(' ');
-							_sb.append(lvalue);
-							_objValue.set( _sb.toString() );
-							_sb.setLength(0);
-							out.collect(NullWritable.get(), _objValue );	
-							found = true;
-							break; //only one write per cell possible (independence)
-						}// note: objs with equal value are directly discarded
-					}
-				}
-				
-				//result merge for objs before compare
-				if( !found )
-					for( Double c : cellList )
-						if( !c.equals( cellCompare ) )
-						{
-							_sb.append(key2.getRowIndex());
-							_sb.append(' ');
-							_sb.append(key2.getColumnIndex());
-							_sb.append(' ');
-							_sb.append(c.doubleValue());
-							_objValue.set( _sb.toString() );
-							_sb.setLength(0);							
-							out.collect(NullWritable.get(), _objValue );	
-							break; //only one write per cell possible (independence)
-						}
-			}
-			//without compare
-			else
-			{
-				MatrixIndexes key2 = (MatrixIndexes) key;
-				while( valueList.hasNext() )  
-				{
-					TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next(); 
-					MatrixCell value = (MatrixCell) tVal.getBaseObject();
-					
-					_sb.append(key2.getRowIndex());
-					_sb.append(' ');
-					_sb.append(key2.getColumnIndex());
-					_sb.append(' ');
-					_sb.append(value.getValue());
-					_objValue.set( _sb.toString() );
-					_sb.setLength(0);				
-					out.collect(NullWritable.get(), _objValue );	
-					break; //only one write per cell possible (independence)
-				}
-			}
-			
-			
-		}
-	}
-	
-	private static class ResultMergeReducerBinaryCell implements ResultMergeReducer
-	{
-		private boolean _requiresCompare;
-		private MatrixCell _objValue;
-		
-		public ResultMergeReducerBinaryCell(boolean requiresCompare)
-		{
-			_requiresCompare = requiresCompare;
-			_objValue = new MatrixCell();
-		}
-
-		@Override
-		public void processKeyValueList(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
-			throws IOException 
-		{
-			//with compare
-			if( _requiresCompare )
-			{
-				// NOTES MB:
-				// 1) the old mapred api does not support multiple scans (reset/mark),
-				//    once we switch to the new api, we can use the resetableiterator for doing
-				//    the two required scans (finding the compare obj, compare all other objs)
-				// 2) for 'binarycell' we assume that the entire valueList fits into main memory
-				//    this is valid as we group by cells, i.e., we would need millions of input files
-				//    to exceed the usual 100-600MB per reduce task.
-				
-				//scan for compare object (incl result merge if compare available)
-				Double cellCompare = null;
-				Collection<Double> cellList = new LinkedList<Double>();
-				boolean found = false;
-				while( valueList.hasNext() ) {
-					TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next();
-					MatrixCell cVal = (MatrixCell) tVal.getBaseObject();
-					if( tVal.getTag()==ResultMergeRemoteMR.COMPARE_TAG )
-						cellCompare = cVal.getValue();
-					else 
-					{
-						if( cellCompare == null )
-							cellList.add( cVal.getValue() );
-						else if( cellCompare.doubleValue() != cVal.getValue() ) //compare on the fly
-						{
-							out.collect(key, cVal );	
-							found = true;
-							break; //only one write per cell possible (independence)
-						}// note: objs with equal value are directly discarded
-					}
-				}
-				
-				//result merge for objs before compare
-				if( !found )
-					for( Double c : cellList )				
-						if( !c.equals( cellCompare) )
-						{				
-							_objValue.setValue(c.doubleValue());
-							out.collect(key, _objValue );	
-							break; //only one write per cell possible (independence)
-						}
-			}
-			//without compare
-			else
-			{
-				while( valueList.hasNext() )  
-				{
-					TaggedMatrixCell tVal = (TaggedMatrixCell) valueList.next(); 
-					out.collect((MatrixIndexes)key, (MatrixCell)tVal.getBaseObject());	
-					break; //only one write per cell possible (independence)
-				}
-			}
-		}
-	}
-	
-	private static class ResultMergeReducerBinaryBlock extends ResultMerge implements ResultMergeReducer
-	{
-		private boolean _requiresCompare;
-		
-		public ResultMergeReducerBinaryBlock(boolean requiresCompare)
-		{
-			_requiresCompare = requiresCompare;
-		}
-		
-		@Override
-		public MatrixObject executeParallelMerge(int par) 
-			throws DMLRuntimeException 
-		{
-			throw new DMLRuntimeException("Unsupported operation.");
-		}
-
-		@Override
-		public MatrixObject executeSerialMerge() 
-			throws DMLRuntimeException 
-		{
-			throw new DMLRuntimeException("Unsupported operation.");
-		}
-
-		@Override
-		public void processKeyValueList(Writable key, Iterator<Writable> valueList, OutputCollector<Writable, Writable> out, Reporter reporter)
-			throws IOException 
-		{	
-			try
-			{
-				MatrixIndexes ixOut = ((ResultMergeTaggedMatrixIndexes)key).getIndexes();
-				MatrixBlock mbOut = null;
-				double[][] aCompare = null;
-				boolean appendOnly = false;
-				
-				//get and prepare compare block if required
-				if( _requiresCompare )
-				{
-					TaggedMatrixBlock tVal = (TaggedMatrixBlock) valueList.next();
-					MatrixBlock bVal = (MatrixBlock) tVal.getBaseObject();
-					if( tVal.getTag()!=ResultMergeRemoteMR.COMPARE_TAG )
-						throw new IOException("Failed to read compare block at expected first position.");
-					aCompare = DataConverter.convertToDoubleMatrix(bVal);
-				}
-				
-				//merge all result blocks into final result block 
-				while( valueList.hasNext() ) 
-				{
-					TaggedMatrixBlock tVal = (TaggedMatrixBlock) valueList.next();
-					MatrixBlock bVal = (MatrixBlock) tVal.getBaseObject();
-					
-					if( mbOut == null ) //copy first block
-					{
-						mbOut = new MatrixBlock();
-						mbOut.copy( bVal );
-						appendOnly = mbOut.isInSparseFormat();
-					}
-					else //merge remaining blocks
-					{
-						if( _requiresCompare )
-							mergeWithComp(mbOut, bVal, aCompare);
-						else
-							mergeWithoutComp(mbOut, bVal, appendOnly);	
-					}
-				}
-				
-				//sort sparse due to append-only
-				if( appendOnly )
-					mbOut.sortSparseRows();
-				
-				//change sparsity if required after 
-				mbOut.examSparsity(); 
-				
-				out.collect(ixOut, mbOut);
-			}
-			catch( Exception ex )
-			{
-				throw new IOException(ex);
-			}			
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
deleted file mode 100644
index 4f81764..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSorting.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteSorting extends WritableComparator
-{
-	
-	
-	protected ResultMergeRemoteSorting()
-	{
-		super(ResultMergeTaggedMatrixIndexes.class, true);
-	}
-	
-	@SuppressWarnings("rawtypes")
-	@Override
-    public int compare(WritableComparable k1, WritableComparable k2) 
-	{
-		ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1;
-		ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2;
-
-		int ret = key1.getIndexes().compareTo(key2.getIndexes());
-		if( ret == 0 ) //same indexes, secondary sort
-		{
-			ret = ((key1.getTag() == key2.getTag()) ? 0 : 
-				   (key1.getTag() < key2.getTag())? -1 : 1);
-		}	
-			
-		return ret; 
-		
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
deleted file mode 100644
index 6765bc3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSpark.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-
-import org.apache.spark.api.java.JavaPairRDD;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.context.ExecutionContext;
-import com.ibm.bi.dml.runtime.controlprogram.context.SparkExecutionContext;
-import com.ibm.bi.dml.runtime.instructions.spark.data.RDDObject;
-import com.ibm.bi.dml.runtime.instructions.spark.utils.RDDAggregateUtils;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * MR job class for submitting parfor result merge MR jobs.
- * 
- */
-public class ResultMergeRemoteSpark extends ResultMerge
-{	
-	
-	private ExecutionContext _ec = null;
-	private int  _numMappers = -1;
-	private int  _numReducers = -1;
-	
-	public ResultMergeRemoteSpark(MatrixObject out, MatrixObject[] in, String outputFilename, ExecutionContext ec, int numMappers, int numReducers) 
-	{
-		super(out, in, outputFilename);
-		
-		_ec = ec;
-		_numMappers = numMappers;
-		_numReducers = numReducers;
-	}
-
-	@Override
-	public MatrixObject executeSerialMerge() 
-		throws DMLRuntimeException 
-	{
-		//graceful degradation to parallel merge
-		return executeParallelMerge( _numMappers );
-	}
-	
-	@Override
-	public MatrixObject executeParallelMerge(int par) 
-		throws DMLRuntimeException 
-	{
-		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
-		LOG.trace("ResultMerge (remote, spark): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-
-		try
-		{
-			if( _inputs != null && _inputs.length>0 )
-			{
-				//prepare compare
-				MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
-				MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
-				MatrixObject compare = (mcOld.getNonZeros()==0) ? null : _output;
-				
-				//actual merge
-				RDDObject ro = executeMerge(compare, _inputs, _output.getVarName(), mcOld.getRows(), mcOld.getCols(), mcOld.getRowsPerBlock(), mcOld.getColsPerBlock());
-				
-				//create new output matrix (e.g., to prevent potential export<->read file access conflict
-				String varName = _output.getVarName();
-				ValueType vt = _output.getValueType();
-				moNew = new MatrixObject( vt, _outputFName );
-				moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
-				moNew.setDataType( DataType.MATRIX );
-				OutputInfo oiOld = metadata.getOutputInfo();
-				InputInfo iiOld = metadata.getInputInfo();
-				MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-						                                             mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-				mc.setNonZeros( computeNonZeros(_output, convertToList(_inputs)) );
-				MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
-				moNew.setMetaData( meta );
-				moNew.setRDDHandle( ro );
-			}
-			else
-			{
-				moNew = _output; //return old matrix, to prevent copy
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		
-		return moNew;		
-	}
-	
-	/**
-	 * 
-	 * @param fname 	null if no comparison required
-	 * @param fnameNew
-	 * @param srcFnames
-	 * @param ii
-	 * @param oi
-	 * @param rlen
-	 * @param clen
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("unchecked")
-	protected RDDObject executeMerge(MatrixObject compare, MatrixObject[] inputs, String varname, long rlen, long clen, int brlen, int bclen)
-		throws DMLRuntimeException 
-	{
-		String jobname = "ParFor-RMSP";
-		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-		
-		SparkExecutionContext sec = (SparkExecutionContext)_ec;
-		boolean withCompare = (compare!=null);
-
-		RDDObject ret = null;
-		
-	    //determine degree of parallelism
-		int numRed = (int)determineNumReducers(rlen, clen, brlen, bclen, _numReducers);
-	
-		//sanity check for empty src files
-		if( inputs == null || inputs.length==0  )
-			throw new DMLRuntimeException("Execute merge should never be called with no inputs.");
-		
-		try
-		{
-		    //Step 1: union over all results
-		    JavaPairRDD<MatrixIndexes, MatrixBlock> rdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-		    		sec.getRDDHandleForMatrixObject(_inputs[0], InputInfo.BinaryBlockInputInfo);
-		    for( int i=1; i<_inputs.length; i++ ) {
-			    JavaPairRDD<MatrixIndexes, MatrixBlock> rdd2 = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-			    		sec.getRDDHandleForMatrixObject(_inputs[i], InputInfo.BinaryBlockInputInfo);
-			    rdd = rdd.union(rdd2);
-		    }
-		
-		    //Step 2a: merge with compare
-		    JavaPairRDD<MatrixIndexes, MatrixBlock> out = null;
-		    if( withCompare )
-		    {
-		    	JavaPairRDD<MatrixIndexes, MatrixBlock> compareRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) 
-			    		sec.getRDDHandleForMatrixObject(compare, InputInfo.BinaryBlockInputInfo);
-			    
-		    	//merge values which differ from compare values
-		    	ResultMergeRemoteSparkWCompare cfun = new ResultMergeRemoteSparkWCompare();
-		    	out = rdd.groupByKey(numRed) //group all result blocks per key
-		    	         .join(compareRdd)   //join compare block and result blocks 
-		    	         .mapToPair(cfun);   //merge result blocks w/ compare
-		    }
-		    //Step 2b: merge without compare
-		    else
-		    {
-		    	//direct merge in any order (disjointness guaranteed)
-		    	out = RDDAggregateUtils.mergeByKey(rdd);
-		    }
-		    
-		    //Step 3: create output rdd handle w/ lineage
-		    ret = new RDDObject(out, varname);
-		    for( int i=0; i<_inputs.length; i++ ) {
-		    	//child rdd handles guaranteed to exist
-		    	RDDObject child = _inputs[i].getRDDHandle();
-				ret.addLineageChild(child);
-		    }
-		}
-		catch( Exception ex )
-		{
-			throw new DMLRuntimeException(ex);
-		}	    
-		
-		//maintain statistics
-	    Statistics.incrementNoOfCompiledSPInst();
-	    Statistics.incrementNoOfExecutedSPInst();
-	    if( DMLScript.STATISTICS ){
-			Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
-		}
-	    
-		return ret;
-	}
-
-	/**
-	 * 
-	 * @param rlen
-	 * @param clen
-	 * @param brlen
-	 * @param bclen
-	 * @param numRed
-	 * @return
-	 */
-	private int determineNumReducers(long rlen, long clen, int brlen, int bclen, long numRed)
-	{
-		//set the number of mappers and reducers 
-	    long reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1);
-		int ret = (int)Math.min( numRed, reducerGroups );
-	    
-	    return ret; 	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
deleted file mode 100644
index 7df2cd1..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteSparkWCompare.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.Iterator;
-
-import org.apache.spark.api.java.function.PairFunction;
-
-import scala.Tuple2;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-
-/**
- * 
- */
-public class ResultMergeRemoteSparkWCompare extends ResultMerge implements PairFunction<Tuple2<MatrixIndexes,Tuple2<Iterable<MatrixBlock>,MatrixBlock>>, MatrixIndexes, MatrixBlock>
-{
-	
-	private static final long serialVersionUID = -5970805069405942836L;
-	
-	@Override
-	public Tuple2<MatrixIndexes, MatrixBlock> call(Tuple2<MatrixIndexes, Tuple2<Iterable<MatrixBlock>, MatrixBlock>> arg)
-		throws Exception 
-	{
-		MatrixIndexes ixin = arg._1();
-		Iterator<MatrixBlock> din = arg._2()._1().iterator();
-		MatrixBlock cin = arg._2()._2();
-		
-		//create compare array
-		double[][] compare = DataConverter.convertToDoubleMatrix(cin);
-		
-		//merge all blocks into compare block
-		MatrixBlock out = new MatrixBlock(cin);
-		while( din.hasNext() )
-			mergeWithComp(out, din.next(), compare);
-		
-		//create output tuple
-		return new Tuple2<MatrixIndexes,MatrixBlock>(new MatrixIndexes(ixin), out);
-	}
-
-	@Override
-	public MatrixObject executeSerialMerge() 
-			throws DMLRuntimeException 
-	{
-		throw new DMLRuntimeException("Unsupported operation.");
-	}
-
-	@Override
-	public MatrixObject executeParallelMerge(int par)
-			throws DMLRuntimeException 
-	{
-		throw new DMLRuntimeException("Unsupported operation.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
deleted file mode 100644
index af87aa3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeTaggedMatrixIndexes.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-
-/**
- * This class serves as composite key for the remote result merge job
- * (for any data format) in order to sort on both matrix indexes and tag
- * but group all blocks according to matrix indexes only. This prevents
- * us from doing an 2pass out-of-core algorithm at the reducer since we
- * can guarantee that the compare block (tag 0) will be the first element
- * in the iterator.
- * 
- */
-public class ResultMergeTaggedMatrixIndexes implements WritableComparable<ResultMergeTaggedMatrixIndexes>
-{
-	
-	private MatrixIndexes _ix;
-	private byte _tag = -1;
-	
-	public ResultMergeTaggedMatrixIndexes()
-	{
-		_ix = new MatrixIndexes();
-	}
-	
-	public ResultMergeTaggedMatrixIndexes(long r, long c, byte tag)
-	{
-		_ix = new MatrixIndexes(r, c);
-		_tag = tag;
-	}
-	
-	public MatrixIndexes getIndexes()
-	{
-		return _ix;
-	}
-	
-	
-	public byte getTag()
-	{
-		return _tag;
-	}
-	
-	public void setTag(byte tag)
-	{
-		_tag = tag;
-	}
-
-	@Override
-	public void readFields(DataInput in) 
-		throws IOException 
-	{
-		if( _ix == null )
-			_ix = new MatrixIndexes();
-		_ix.readFields(in);
-		_tag = in.readByte();
-	}
-
-	@Override
-	public void write(DataOutput out) 
-		throws IOException 
-	{
-		_ix.write(out);
-		out.writeByte(_tag);
-	}
-
-	@Override
-	public int compareTo(ResultMergeTaggedMatrixIndexes that) 
-	{
-		int ret = _ix.compareTo(that._ix);
-		
-		if( ret == 0 )
-		{
-			ret = ((_tag == that._tag) ? 0 : 
-				   (_tag < that._tag)? -1 : 1);
-		}
-		
-		return ret;
-	}
-	
-	@Override
-	public boolean equals(Object other) 
-	{
-		if( !(other instanceof ResultMergeTaggedMatrixIndexes) )
-			return false;
-		
-		ResultMergeTaggedMatrixIndexes that = (ResultMergeTaggedMatrixIndexes)other;
-		return (_ix.equals(that._ix) && _tag == that._tag);
-	}
-	
-	@Override
-	public int hashCode() {
-		throw new RuntimeException("hashCode() should never be called on instances of this class.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java
deleted file mode 100644
index c2ff9ac..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/Task.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.Serializable;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.StringTokenizer;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * A task is a logical group of one or multiple iterations (each iteration is assigned to exactly one task).
- * There, each single task is executed sequentially. See TaskPartitioner for how tasks are created and
- * ParWorker for how those tasks are eventually executed.
- * 
- * NOTE: (Extension possibility: group of statements) 
- * 
- */
-public class Task implements Serializable
-{
-	
-	private static final long serialVersionUID = 2815832451487164284L;
-	
-	public enum TaskType {
-		RANGE, 
-		SET
-	}
-	
-	public static final int MAX_VARNAME_SIZE  = 256;
-	public static final int MAX_TASK_SIZE     = Integer.MAX_VALUE-1; 
-	
-	private TaskType           	  _type;
-	private LinkedList<IntObject> _iterations; //each iteration is specified as an ordered set of index values
-	
-	public Task() {
-		//default constructor for serialize
-	}
-	
-	public Task( TaskType type )
-	{
-		_type = type;
-		
-		_iterations = new LinkedList<IntObject>();
-	}
-	
-	public void addIteration( IntObject indexVal ) 
-	{
-		if( indexVal.getName().length() > MAX_VARNAME_SIZE )
-			throw new RuntimeException("Cannot add iteration, MAX_VARNAME_SIZE exceeded.");
-		
-		if( size() >= MAX_TASK_SIZE )
-			throw new RuntimeException("Cannot add iteration, MAX_TASK_SIZE reached.");
-			
-		_iterations.addLast( indexVal );
-	}
-	
-	public List<IntObject> getIterations()
-	{
-		return _iterations;
-	}
-	
-	public TaskType getType()
-	{
-		return _type;
-	}
-	
-	public int size()
-	{
-		return _iterations.size();
-	}
-	
-	/**
-	 * 
-	 * @param task
-	 */
-	public void mergeTask( Task task )
-	{
-		//check for set iteration type
-		if( _type==TaskType.RANGE )
-			throw new RuntimeException("Task Merging not supported for tasks of type ITERATION_RANGE.");
-		
-		//check for same iteration name
-		String var1 = _iterations.getFirst().getName();
-		String var2 = task._iterations.getFirst().getName();
-		if( !var1.equals(var2) )
-			throw new RuntimeException("Task Merging not supported for tasks with different variable names");
-	
-		//merge tasks
-		for( IntObject o : task._iterations )
-			_iterations.addLast( o );
-	}
-	
-
-	@Override
-	public String toString() 
-	{
-		return toFormatedString();
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public String toFormatedString()
-	{
-		StringBuilder sb = new StringBuilder();
-		sb.append("task (type=");
-		sb.append(_type);
-		sb.append(", iterations={");
-		int count=0;
-		for( IntObject dat : _iterations )
-		{
-			if( count!=0 ) 
-				sb.append(";");
-			sb.append("[");
-			sb.append(dat.getName());
-			sb.append("=");
-			sb.append(dat.getLongValue());
-			sb.append("]");
-			
-			count++;
-		}
-		sb.append("})");
-		return sb.toString();
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public String toCompactString()
-	{
-		StringBuilder sb = new StringBuilder( );
-		sb.append(_type);
-		
-		if( size() > 0 )
-		{
-			sb.append(".");
-			IntObject dat0 = _iterations.getFirst();
-			sb.append(dat0.getName());
-			sb.append(".{");
-		
-			int count = 0;
-			for( IntObject dat : _iterations )
-			{
-				if( count!=0 ) 
-					sb.append(",");
-				sb.append(dat.getLongValue());
-				count++;
-			}
-			
-			sb.append("}");
-		}
-		
-		return sb.toString();
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public String toCompactString( int maxDigits )
-	{
-		StringBuilder sb = new StringBuilder( );
-		sb.append(_type);
-		
-		if( size() > 0 )
-		{
-			sb.append(".");
-			IntObject dat0 = _iterations.getFirst();
-			sb.append(dat0.getName());
-			sb.append(".{");
-		
-			int count = 0;
-			for( IntObject dat : _iterations )
-			{
-				if( count!=0 ) 
-					sb.append(",");
-				
-				String tmp = String.valueOf(dat.getLongValue());
-				for( int k=tmp.length(); k<maxDigits; k++ )
-					sb.append("0");
-				sb.append(tmp);
-				count++;
-			}
-			
-			sb.append("}");
-		}
-		
-		return sb.toString();
-	}
-	
-	/**
-	 * 
-	 * @param stask
-	 * @return
-	 */
-	public static Task parseCompactString( String stask )
-	{
-		StringTokenizer st = new StringTokenizer( stask.trim(), "." );		
-		
-		Task newTask = new Task( TaskType.valueOf(st.nextToken()) );
-		String meta = st.nextToken();
-		
-		//iteration data
-		String sdata = st.nextToken();
-		sdata = sdata.substring(1,sdata.length()-1); // remove brackets
-		StringTokenizer st2 = new StringTokenizer(sdata, ",");
-		while( st2.hasMoreTokens() )
-		{
-			//create new iteration
-			String lsdata = st2.nextToken();
-			IntObject ldata = new IntObject(meta,Integer.parseInt( lsdata ) );
-			newTask.addIteration(ldata);
-		}
-		
-		return newTask;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java
deleted file mode 100644
index 0b7eb4e..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitioner.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.List;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This is the base class for all task partitioner. For this purpose it stores relevant information such as
- * the loop specification (FROM, TO, INCR), the index variable and the task size. Furthermore, it declares two
- * prototypes: (1) full task creation, (2) streaming task creation.
- * 
- * Known implementation classes: TaskPartitionerFixedsize, TaskPartitionerFactoring
- * 
- */
-public abstract class TaskPartitioner 
-{
-	
-	protected long            _taskSize     = -1;
-	
-	protected String  		 _iterVarName  = null;
-	protected IntObject      _fromVal      = null;
-	protected IntObject      _toVal        = null;
-	protected IntObject      _incrVal      = null;
-	
-	protected long            _numIter      = -1;
-	
-	
-	protected TaskPartitioner( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		_taskSize    = taskSize;
-		
-		_iterVarName = iterVarName;
-		_fromVal     = fromVal;
-		_toVal       = toVal;
-		_incrVal     = incrVal;
-		
-		_numIter     = (long)Math.ceil(((double)(_toVal.getLongValue()-_fromVal.getLongValue()+1 )) / _incrVal.getLongValue()); 
-	}
-	
-	/**
-	 * Creates and returns set of all tasks for given problem at once.
-	 * 
-	 * @return
-	 */
-	public abstract List<Task> createTasks()
-		throws DMLRuntimeException;
-	
-	/**
-	 * Creates set of all tasks for given problem, but streams them directly
-	 * into task queue. This allows for more tasks than fitting in main memory.
-	 * 
-	 * @return
-	 */
-	public abstract long createTasks( LocalTaskQueue<Task> queue )
-		throws DMLRuntimeException;
-
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public long getNumIterations()
-	{
-		return _numIter;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
deleted file mode 100644
index cd03544..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoring.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.Task.TaskType;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This factoring task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks. Note that the task size is used here.
- * The tasks are created with decreasing size for good load balance of heterogeneous tasks.
- * 
- * 
- * See the original paper for details:
- * [Susan Flynn Hummel, Edith Schonberg, Lawrence E. Flynn: 
- * Factoring: a practical and robust method for scheduling parallel loops. 
- * SC 1991: 610-632]
- * 
- */
-public class TaskPartitionerFactoring extends TaskPartitioner
-{
-	
-	private int _numThreads = -1;
-	
-	public TaskPartitionerFactoring( long taskSize, int numThreads, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		super(taskSize, iterVarName, fromVal, toVal, incrVal);
-		
-		_numThreads = numThreads;
-	}
-
-	@Override
-	public List<Task> createTasks() 
-		throws DMLRuntimeException 
-	{
-		LinkedList<Task> tasks = new LinkedList<Task>();
-		
-		long lFrom  = _fromVal.getLongValue();
-		long lTo    = _toVal.getLongValue();
-		long lIncr  = _incrVal.getLongValue();
-		
-		int P = _numThreads;  // number of parallel workers
-		long N = _numIter;     // total number of iterations
-		long R = N;            // remaining number of iterations
-		long K = -1;           // next _numThreads task sizes	
-		TaskType type = null; // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
-		
-		for( long i = lFrom; i<=lTo;  )
-		{
-			K = determineNextBatchSize(R, P);
-			R -= (K * P);
-			
-			type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ? 
-					   TaskType.RANGE : TaskType.SET;
-			
-			//for each logical processor
-			for( int j=0; j<P; j++ )
-			{
-				if( i > lTo ) //no more iterations
-					break;
-				
-				//create new task and add to list of tasks
-				Task lTask = new Task( type );
-				tasks.addLast(lTask);
-				
-				// add iterations to task 
-				if( type == TaskType.SET ) 
-				{
-					//value based tasks
-					for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
-					{
-						lTask.addIteration(new IntObject(_iterVarName, i));				
-					}				
-				}
-				else 
-				{
-					//determine end of task
-					long to = Math.min( i+(K-1)*lIncr, lTo );
-					
-					//range based tasks
-					lTask.addIteration(new IntObject(_iterVarName, i));	    //from
-					lTask.addIteration(new IntObject(_iterVarName, to));    //to
-					lTask.addIteration(new IntObject(_iterVarName, lIncr));	//increment
-					
-					i = to + lIncr;
-				}
-			}
-		}
-
-		return tasks;
-	}
-
-	@Override
-	public long createTasks(LocalTaskQueue<Task> queue) 
-		throws DMLRuntimeException 
-	{		
-		long numCreatedTasks = 0;
-		
-		long lFrom  = _fromVal.getLongValue();
-		long lTo    = _toVal.getLongValue();
-		long lIncr  = _incrVal.getLongValue();
-		
-		int P = _numThreads;     // number of parallel workers
-		long N = _numIter;     // total number of iterations
-		long R = N;               // remaining number of iterations
-		long K = -1;              //next _numThreads task sizes	
-	    TaskType type = null;    // type of iterations: range tasks (similar to run-length encoding) make only sense if taskSize>3
-		
-		try
-		{
-			for( long i = lFrom; i<=lTo;  )
-			{
-				K = determineNextBatchSize(R, P);
-				R -= (K * P);
-				
-				type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && K>3 ) ? 
-						   TaskType.RANGE : TaskType.SET;
-				
-				//for each logical processor
-				for( int j=0; j<P; j++ )
-				{
-					if( i > lTo ) //no more iterations
-						break;
-					
-					//create new task and add to list of tasks
-					Task lTask = new Task( type );
-					
-					// add iterations to task 
-					if( type == TaskType.SET ) 
-					{
-						//value based tasks
-						for( long k=0; k<K && i<=lTo; k++, i+=lIncr )
-						{
-							lTask.addIteration(new IntObject(_iterVarName, i));				
-						}				
-					}
-					else 
-					{
-						//determine end of task
-						long to = Math.min( i+(K-1)*lIncr, lTo );
-						
-						//range based tasks
-						lTask.addIteration(new IntObject(_iterVarName, i));	    //from
-						lTask.addIteration(new IntObject(_iterVarName, to));    //to
-						lTask.addIteration(new IntObject(_iterVarName, lIncr));	//increment
-						
-						i = to + lIncr;
-					}
-					
-					//add task to queue (after all iteration added for preventing raise conditions)
-					queue.enqueueTask( lTask );
-					numCreatedTasks++;
-				}
-			}
-			
-			// mark end of task input stream
-			queue.closeInput();	
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-	
-		return numCreatedTasks;
-	}
-	
-	
-	/**
-	 * Computes the task size (number of iterations per task) for the next numThreads tasks 
-	 * given the number of remaining iterations R, and the number of Threads.
-	 * 
-	 * NOTE: x can be set to different values, but the original paper argues for x=2.
-	 * 
-	 * @param R
-	 * @return
-	 */
-	protected long determineNextBatchSize(long R, int P) 
-	{
-		int x = 2;
-		long K = (long) Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
-		
-		if( K < 1 ) //account for rounding errors
-			K = 1;
-		
-		return K;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java
deleted file mode 100644
index 914f387..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmax.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * Factoring with maximum constraint (e.g., if LIX matrix out-of-core and we need
- * to bound the maximum number of iterations per map task -> memory bounds) 
- */
-public class TaskPartitionerFactoringCmax extends TaskPartitionerFactoring
-{
-	
-	protected long _constraint = -1;
-	
-	public TaskPartitionerFactoringCmax( long taskSize, int numThreads, long constraint, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		super(taskSize, numThreads, iterVarName, fromVal, toVal, incrVal);
-		
-		_constraint = constraint;
-	}
-
-	@Override
-	protected long determineNextBatchSize(long R, int P) 
-	{
-		int x = 2;
-		long K = (long)Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
-		
-		if( K > _constraint ) //account for rounding errors
-			K = _constraint;
-		
-		return K;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java
deleted file mode 100644
index 0d1c022..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFactoringCmin.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * Factoring with minimum constraint (e.g., if communication is expensive)
- */
-public class TaskPartitionerFactoringCmin extends TaskPartitionerFactoring
-{
-	
-	protected long _constraint = -1;
-	
-	public TaskPartitionerFactoringCmin( long taskSize, int numThreads, long constraint, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		super(taskSize, numThreads, iterVarName, fromVal, toVal, incrVal);
-		
-		_constraint = constraint;
-	}
-
-	@Override
-	protected long determineNextBatchSize(long R, int P) 
-	{
-		int x = 2;
-		long K = (long) Math.ceil((double)R / ( x * P )); //NOTE: round creates more tasks
-		
-		if( K < _constraint ) //account for rounding errors
-			K = _constraint;
-		
-		return K;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java
deleted file mode 100644
index 54fc72a..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerFixedsize.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.Task.TaskType;
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This naive task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks according to the given task size. There, all
- * tasks are equally sized.
- * 
- */
-public class TaskPartitionerFixedsize extends TaskPartitioner
-{
-	
-	protected int _firstnPlus1 = 0; //add one to these firstn tasks
- 	
-	public TaskPartitionerFixedsize( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		super(taskSize, iterVarName, fromVal, toVal, incrVal);
-	}
-
-	@Override
-	public List<Task> createTasks() 
-		throws DMLRuntimeException 
-	{
-		LinkedList<Task> tasks = new LinkedList<Task>();
-		
-		//range tasks (similar to run-length encoding) make only sense if taskSize>3
-		TaskType type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && _taskSize>3 ) ? 
-				           TaskType.RANGE : TaskType.SET;
-		
-		long lFrom  = _fromVal.getLongValue();
-		long lTo    = _toVal.getLongValue();
-		long lIncr  = _incrVal.getLongValue();
-		long lfnp1  = _firstnPlus1;
-		
-		for( long i = lFrom; i<=lTo;  )
-		{
-			//create new task and add to list of tasks
-			Task lTask = new Task( type );
-			tasks.addLast(lTask);
-			
-			int corr = (lfnp1-- > 0)? 1:0; //correction for static partitioner
-			
-			// add <tasksize> iterations to task 
-			// (last task might have less)
-			if( type == TaskType.SET ) 
-			{
-				//value based tasks
-				for( long j=0; j<_taskSize+corr && i<=lTo; j++, i+=lIncr )
-				{
-					lTask.addIteration(new IntObject(_iterVarName, i));				
-				}				
-			}
-			else 
-			{
-				//determine end of task
-				long to = Math.min( i+(_taskSize-1+corr)*lIncr, lTo );
-				
-				//range based tasks
-				lTask.addIteration(new IntObject(_iterVarName, i));	    //from
-				lTask.addIteration(new IntObject(_iterVarName, to));    //to
-				lTask.addIteration(new IntObject(_iterVarName, lIncr));	//increment
-				
-				i = to + lIncr;
-			}
-		}
-
-		return tasks;
-	}
-
-	@Override
-	public long createTasks(LocalTaskQueue<Task> queue) 
-		throws DMLRuntimeException 
-	{
-		long numCreatedTasks=0;
-		
-		//range tasks (similar to run-length encoding) make only sense if taskSize>3
-		TaskType type = (ParForProgramBlock.USE_RANGE_TASKS_IF_USEFUL && _taskSize>3 ) ? 
-				              TaskType.RANGE : TaskType.SET;
-		
-		long lFrom  = _fromVal.getLongValue();
-		long lTo    = _toVal.getLongValue();
-		long lIncr  = _incrVal.getLongValue();
-		long lfnp1  = _firstnPlus1;
-		
-		try
-		{
-			for( long i = lFrom; i<=lTo;  )
-			{
-				//create new task and add to list of tasks
-				Task lTask = new Task( type );
-				
-				int corr = (lfnp1-- > 0)? 1:0; //correction for static partitioner
-				
-				// add <tasksize> iterations to task 
-				// (last task might have less)
-				if( type == TaskType.SET ) 
-				{
-					//value based tasks
-					for( long j=0; j<_taskSize+corr && i<=lTo; j++, i+=lIncr )
-					{
-						lTask.addIteration(new IntObject(_iterVarName, i));				
-					}				
-				}
-				else 
-				{
-					//determine end of task
-					long to = Math.min( i+(_taskSize-1+corr)*lIncr, lTo );
-					
-					//range based tasks
-					lTask.addIteration(new IntObject(_iterVarName, i));	    //from
-					lTask.addIteration(new IntObject(_iterVarName, to));    //to
-					lTask.addIteration(new IntObject(_iterVarName, lIncr));	//increment
-					
-					i = to + lIncr;
-				}
-				
-				//add task to queue (after all iteration added for preventing raise conditions)
-				queue.enqueueTask( lTask );
-				numCreatedTasks++;
-			}
-			
-			// mark end of task input stream
-			queue.closeInput();	
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		
-		return numCreatedTasks;
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java
deleted file mode 100644
index 2ad4806..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerNaive.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This static task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks according to a task size of numIterations/numWorkers. 
- * There, all tasks are equally sized.
- * 
- */
-public class TaskPartitionerNaive extends TaskPartitionerFixedsize
-{
-	
-	public TaskPartitionerNaive( long taskSize, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		super(taskSize, iterVarName, fromVal, toVal, incrVal);
-	
-		//compute the new task size
-		_taskSize = 1;
-	}	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java
deleted file mode 100644
index 20be635..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/TaskPartitionerStatic.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import com.ibm.bi.dml.runtime.instructions.cp.IntObject;
-
-/**
- * This static task partitioner virtually iterates over the given FOR loop (from, to, incr),
- * creates iterations and group them to tasks according to a task size of numIterations/numWorkers. 
- * There, all tasks are equally sized.
- * 
- */
-public class TaskPartitionerStatic extends TaskPartitionerFixedsize
-{
-	
-	public TaskPartitionerStatic( long taskSize, int numThreads, String iterVarName, IntObject fromVal, IntObject toVal, IntObject incrVal ) 
-	{
-		super(taskSize, iterVarName, fromVal, toVal, incrVal);
-	
-		_taskSize = _numIter / numThreads;
-		_firstnPlus1 = (int)_numIter % numThreads;
-	}	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java
deleted file mode 100644
index 7b3423a..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/MergedMRJobInstruction.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.MetaData;
-
-/**
- * Merged MR Job instruction to hold the actually merged instruction as well as offsets of
- * result indexes in order to split result meta data after successful execution.
- * 
- */
-public class MergedMRJobInstruction 
-{
-	
-	protected MRJobInstruction inst;
-	protected LinkedList<Long> ids;
-	protected HashMap<Long,Integer> outIxOffs;
-	protected HashMap<Long,Integer> outIxLens;
-	
-	public MergedMRJobInstruction()
-	{
-		ids = new LinkedList<Long>();
-		outIxOffs = new HashMap<Long,Integer>();
-		outIxLens = new HashMap<Long,Integer>();
-	}
-	
-	public void addInstructionMetaData(long instID, int outIxOffset, int outIxLen)
-	{
-		ids.add(instID);
-		outIxOffs.put(instID, outIxOffset);
-		outIxLens.put(instID, outIxLen);
-	}
-	
-	/**
-	 * 
-	 * @param instID
-	 * @param allRet
-	 * @return
-	 */
-	public JobReturn constructJobReturn( long instID, JobReturn retAll )
-	{
-		//get output offset and len
-		int off = outIxOffs.get(instID);
-		int len = outIxLens.get(instID);
-		
-		//create partial output meta data 
-		JobReturn ret = new JobReturn();
-		ret.successful = retAll.successful;
-		if( ret.successful ) {
-			ret.metadata = new MetaData[len];
-			System.arraycopy(retAll.metadata, off, ret.metadata, 0, len);
-		}
-		
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java
deleted file mode 100644
index f76f515..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorker.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.Timing;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-
-/**
- * 
- * 
- */
-public abstract class PiggybackingWorker extends Thread
-{
-	
-	
-	protected static final Log LOG = LogFactory.getLog(PiggybackingWorker.class.getName());
-
-	protected HashMap<Long, JobReturn> _results = null;	
-	protected boolean _stop;
-	
-	protected PiggybackingWorker()
-	{
-		_results = new HashMap<Long, JobReturn>();
-		_stop = false;
-	}
-
-	/**
-	 * 
-	 */
-	public void setStopped()
-	{
-		_stop = true;
-	}
-	
-	/**
-	 * 
-	 * @param instID
-	 * @return
-	 * @throws InterruptedException
-	 */
-	public synchronized JobReturn getJobResult( long instID ) 
-		throws InterruptedException
-	{
-		JobReturn ret = null;
-				
-		while( ret == null )
-		{
-			//wait for new results 
-			wait();
-			
-			//obtain job return (if available)
-			ret = _results.remove( instID );
-		}
-		
-		return ret;
-	}
-	
-	
-	/**
-	 * 
-	 * @param ids
-	 * @param results
-	 */
-	protected synchronized void putJobResults( LinkedList<Long> ids, LinkedList<JobReturn> results )
-	{
-		//make job returns available
-		for( int i=0; i<ids.size(); i++ )
-			_results.put(ids.get(i), results.get(i));
-	
-		//notify all waiting threads
-		notifyAll();
-	}
-	
-	/**
-	 * 
-	 * @param workingSet
-	 * @return
-	 * @throws IllegalAccessException 
-	 */
-	protected LinkedList<MergedMRJobInstruction> mergeMRJobInstructions( LinkedList<Pair<Long,MRJobInstruction>> workingSet ) 
-		throws IllegalAccessException
-	{
-		LinkedList<MergedMRJobInstruction> ret = new LinkedList<MergedMRJobInstruction>();
-		Timing time = new Timing(true);
-		
-		//NOTE currently all merged into one (might be invalid due to memory constraints)
-		MergedMRJobInstruction minst = new MergedMRJobInstruction();
-		for( Pair<Long,MRJobInstruction> inst : workingSet )
-		{
-			long instID = inst.getKey();
-			MRJobInstruction instVal = inst.getValue();
-			int numOutputs = instVal.getOutputs().length;
-			
-			//append to current merged instruction
-			if( minst.inst==null )
-			{
-				//deep copy first instruction
-				minst.inst = new MRJobInstruction( instVal );	
-				minst.addInstructionMetaData( instID, 0, numOutputs );
-			}
-			else
-			{	
-				//merge other instructions
-				if( minst.inst.isMergableMRJobInstruction( instVal ) )
-				{
-					//add instruction to open merged instruction
-					int offOutputs = minst.inst.getOutputs().length; //before merge
-					minst.inst.mergeMRJobInstruction( instVal );
-					minst.addInstructionMetaData(instID, offOutputs, numOutputs);	
-				}
-				else
-				{
-					//close current merged instruction
-					ret.add(minst); 
-					//open new merged instruction
-					minst = new MergedMRJobInstruction();
-					minst.inst = new MRJobInstruction( instVal );	
-					minst.addInstructionMetaData( instID, 0, numOutputs );
-				}
-			}
-		}
-		//close last open merged instruction
-		ret.add(minst);
-		
-		//output log info for better understandability for users
-		LOG.info("Merged MR-Job instructions: "+workingSet.size()+" --> "+ret.size()+" in "+time.stop()+"ms.");
-		
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java
deleted file mode 100644
index d385fc7..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerTimeSequential.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.LinkedList;
-
-import com.ibm.bi.dml.lops.runtime.RunMRJobs;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-import com.ibm.bi.dml.utils.Statistics;
-
-public class PiggybackingWorkerTimeSequential extends PiggybackingWorker
-{
-
-	//internal configuration parameters
-	private static long DEFAULT_MERGE_INTERVAL = 1000;
-	private static boolean SUBSTRACT_EXEC_TIME = true;
-	
-	private long _time;
-	
-	public PiggybackingWorkerTimeSequential()
-	{
-		this(DEFAULT_MERGE_INTERVAL);
-	}
-	
-	public PiggybackingWorkerTimeSequential( long timeInterval )
-	{
-		_time = timeInterval;
-	}
-
-	@Override
-	public void run() 
-	{
-		long lastTime = System.currentTimeMillis();
-		
-		while( !_stop )
-		{
-			try
-			{
-				// wait until next submission
-				if( SUBSTRACT_EXEC_TIME ) {
-					long currentTime = System.currentTimeMillis();
-					if( currentTime-lastTime < _time  )
-						Thread.sleep( _time-(currentTime-lastTime) );
-					lastTime = currentTime;
-				}
-				else
-					Thread.sleep(_time);
-				
-				
-				// pick job type with largest number of jobs
-				LinkedList<Pair<Long,MRJobInstruction>> workingSet = RuntimePiggybacking.getMaxWorkingSet();
-				if( workingSet == null )
-					continue; //empty pool
-				
-				// merge jobs (if possible)
-				LinkedList<MergedMRJobInstruction> mergedWorkingSet = mergeMRJobInstructions(workingSet);
-				
-				// submit all resulting jobs (currently sequential submission)
-				for( MergedMRJobInstruction minst : mergedWorkingSet )
-				{
-					JobReturn mret = RunMRJobs.submitJob(minst.inst);
-					Statistics.incrementNoOfExecutedMRJobs();
-					
-					// error handling
-					if( !mret.successful )
-						LOG.error("Failed to run merged mr-job instruction:\n"+minst.inst.toString()); 
-					
-					// split job return
-					LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
-					for( Long id : minst.ids ){
-						ret.add( minst.constructJobReturn(id, mret) );
-						Statistics.decrementNoOfExecutedMRJobs();
-					}
-					// make job returns available and notify waiting clients
-					putJobResults(minst.ids, ret);
-				}
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException(ex);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java
deleted file mode 100644
index cad9967..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilDecayParallel.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import com.ibm.bi.dml.lops.runtime.RunMRJobs;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * 
- * Extensions: (1) take number of running jobs into account,
- * (2) compute timeout threshold based on max and last job execution time.
- */
-public class PiggybackingWorkerUtilDecayParallel extends PiggybackingWorker
-{
-
-	//internal configuration parameters
-	private static long MIN_MERGE_INTERVAL = 1000;
-	private static double UTILIZATION_DECAY = 0.5; //decay per minute 
-	
-	//thread pool for parallel submit
-	private ExecutorService _parSubmit = null;
-	
-	private long _minTime = -1;
-	private double _utilDecay = -1; 
-	private int _par = -1;
-	
-	public PiggybackingWorkerUtilDecayParallel(int par)
-	{
-		this( MIN_MERGE_INTERVAL, 
-			  UTILIZATION_DECAY, 
-			  par );
-	}
-	
-	public PiggybackingWorkerUtilDecayParallel( long minInterval, double utilDecay, int par )
-	{
-		_minTime = minInterval;
-		_utilDecay = utilDecay;
-		_par = par;
-		
-		//init thread pool
-		_parSubmit = Executors.newFixedThreadPool(_par);
-	}
-
-	@Override 
-	public void setStopped()
-	{
-		//parent logic
-		super.setStopped();
-		
-		//explicitly stop the thread pool
-		_parSubmit.shutdown();
-	}
-	
-	@Override
-	public void run() 
-	{
-		long lastTime = System.currentTimeMillis();
-		
-		while( !_stop )
-		{
-			try
-			{
-				long currentTime = System.currentTimeMillis()+1; //ensure > lastTime
-				
-				// wait until next submission
-				Thread.sleep(_minTime); //wait at least minTime
-				
-				//continue if (prevent cluster status requests)
-				if( RuntimePiggybacking.isEmptyJobPool() )
-					continue;
-				
-				double util = RuntimePiggybackingUtils.getCurrentClusterUtilization();
-				double utilThreshold = 1-Math.pow(_utilDecay, Math.ceil(((double)currentTime-lastTime)/60000));
-				
-				//continue to collect jobs if cluster util too high (decay to prevent starvation)
-				if( util > utilThreshold ) { //cluster utilization condition
-					continue; //1min - >50%, 2min - >75%, 3min - >87.5%, 4min - > 93.7%
-				}
-				
-				// pick job type with largest number of jobs
-				LinkedList<Pair<Long,MRJobInstruction>> workingSet = RuntimePiggybacking.getMaxWorkingSet();
-				if( workingSet == null )
-					continue; //empty pool
-				
-				// merge jobs (if possible)
-				LinkedList<MergedMRJobInstruction> mergedWorkingSet = mergeMRJobInstructions(workingSet);
-				
-				// submit all resulting jobs (parallel submission)
-				for( MergedMRJobInstruction minst : mergedWorkingSet )
-				{
-					//submit job and return results if finished
-					_parSubmit.execute(new MRJobSubmitTask(minst));
-				}
-				
-				lastTime = currentTime;
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException(ex);
-			}
-		}
-	}
-	
-	
-	/**
-	 * 
-	 * 
-	 */
-	public class MRJobSubmitTask implements Runnable
-	{
-		private MergedMRJobInstruction _minst = null;
-		
-		public MRJobSubmitTask( MergedMRJobInstruction minst )
-		{
-			_minst = minst;
-		}
-		
-		@Override
-		public void run() 
-		{
-			try
-			{
-				// submit mr job
-				JobReturn mret = RunMRJobs.submitJob(_minst.inst);
-				Statistics.incrementNoOfExecutedMRJobs();
-				
-				// error handling
-				if( !mret.successful )
-					LOG.error("Failed to run merged mr-job instruction:\n"+_minst.inst.toString()); 
-				
-				// split job return
-				LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
-				for( Long id : _minst.ids ){
-					ret.add( _minst.constructJobReturn(id, mret) );
-					Statistics.decrementNoOfExecutedMRJobs();
-				}
-				putJobResults(_minst.ids, ret);
-			}
-			catch(Exception ex)
-			{
-				//log error and merged instruction
-				LOG.error("Failed to run merged mr-job instruction:\n"+_minst.inst.toString(),ex); 
-				
-				//handle unsuccessful job returns for failed job 
-				//(otherwise clients would literally wait forever for results)
-				LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
-				for( Long id : _minst.ids ){
-					JobReturn fret = new JobReturn(new MatrixCharacteristics[_minst.outIxLens.get(id)], false); 
-					ret.add( _minst.constructJobReturn(id, fret) );
-					Statistics.decrementNoOfExecutedMRJobs();
-				}
-				// make job returns available and notify waiting clients
-				putJobResults(_minst.ids, ret);
-			}
-		}
-		
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java
deleted file mode 100644
index 9c89532..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/mqo/PiggybackingWorkerUtilTimeParallel.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor.mqo;
-
-import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import com.ibm.bi.dml.lops.runtime.RunMRJobs;
-import com.ibm.bi.dml.runtime.instructions.MRJobInstruction;
-import com.ibm.bi.dml.runtime.matrix.JobReturn;
-import com.ibm.bi.dml.runtime.matrix.data.Pair;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * 
- * Extensions: (1) take number of running jobs into account,
- * (2) compute timeout threshold based on max and last job execution time.
- */
-public class PiggybackingWorkerUtilTimeParallel extends PiggybackingWorker
-{
-
-	//internal configuration parameters
-	private static long MIN_MERGE_INTERVAL = 1000;
-	private static long MAX_MERGE_INTERVAL = 60000;
-	private static double UTILIZATION_THRESHOLD = 0.4; //60% occupied map tasks
-	
-	//thread pool for parallel submit
-	private ExecutorService _parSubmit = null;
-	
-	private long _minTime = -1;
-	private long _maxTime = -1;
-	private double _utilThreshold = -1; 
-	private int _par = -1;
-	
-	public PiggybackingWorkerUtilTimeParallel(int par)
-	{
-		this( MIN_MERGE_INTERVAL, 
-			  MAX_MERGE_INTERVAL, 
-			  UTILIZATION_THRESHOLD, 
-			  par );
-	}
-	
-	public PiggybackingWorkerUtilTimeParallel( long minInterval, long maxInterval, double utilThreshold, int par )
-	{
-		_minTime = minInterval;
-		_maxTime = maxInterval;
-		_utilThreshold = utilThreshold;
-		_par = par;
-		
-		//init thread pool
-		_parSubmit = Executors.newFixedThreadPool(_par);
-	}
-
-	@Override 
-	public void setStopped()
-	{
-		//parent logic
-		super.setStopped();
-		
-		//explicitly stop the thread pool
-		_parSubmit.shutdown();
-	}
-	
-	@Override
-	public void run() 
-	{
-		long lastTime = System.currentTimeMillis();
-		
-		while( !_stop )
-		{
-			try
-			{
-				long currentTime = System.currentTimeMillis();
-				
-				// wait until next submission
-				Thread.sleep(_minTime); //wait at least minTime
-				if( RuntimePiggybacking.isEmptyJobPool() )
-					continue;
-				double util = RuntimePiggybackingUtils.getCurrentClusterUtilization();
-				if(   util > _utilThreshold           //cluster utilization condition
-				   && currentTime-lastTime<_maxTime ) //timeout condition 
-				{
-					continue;
-				}
-				
-				// pick job type with largest number of jobs
-				LinkedList<Pair<Long,MRJobInstruction>> workingSet = RuntimePiggybacking.getMaxWorkingSet();
-				if( workingSet == null )
-					continue; //empty pool
-				
-				// merge jobs (if possible)
-				LinkedList<MergedMRJobInstruction> mergedWorkingSet = mergeMRJobInstructions(workingSet);
-				
-				// submit all resulting jobs (parallel submission)
-				for( MergedMRJobInstruction minst : mergedWorkingSet )
-				{
-					//submit job and return results if finished
-					_parSubmit.execute(new MRJobSubmitTask(minst));
-				}
-				
-				lastTime = currentTime;
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException(ex);
-			}
-		}
-	}
-	
-	
-	/**
-	 * 
-	 * 
-	 */
-	public class MRJobSubmitTask implements Runnable
-	{
-		private MergedMRJobInstruction _minst = null;
-		
-		public MRJobSubmitTask( MergedMRJobInstruction minst )
-		{
-			_minst = minst;
-		}
-		
-		@Override
-		public void run() 
-		{
-			try
-			{
-				// submit mr job
-				JobReturn mret = RunMRJobs.submitJob(_minst.inst);
-				Statistics.incrementNoOfExecutedMRJobs();
-
-				// error handling
-				if( !mret.successful )
-					LOG.error("Failed to run merged mr-job instruction:\n"+_minst.inst.toString()); 
-				
-				// split job return
-				LinkedList<JobReturn> ret = new LinkedList<JobReturn>();
-				for( Long id : _minst.ids ){
-					ret.add( _minst.constructJobReturn(id, mret) );
-					Statistics.decrementNoOfExecutedMRJobs();
-				}
-				// make job returns available and notify waiting clients
-				putJobResults(_minst.ids, ret);
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException(ex); 
-			}
-		}
-		
-	}
-}