You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by lr...@apache.org on 2015/12/03 19:45:34 UTC

[03/78] [abbrv] [partial] incubator-systemml git commit: Move files to new package folder structure

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
deleted file mode 100644
index 9d6f320..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/RemoteParWorkerMapper.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.runtime.controlprogram.LocalVariableMap;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheStatistics;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheableData;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.StatisticMonitor;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDHandler;
-import com.ibm.bi.dml.runtime.instructions.cp.Data;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * Remote ParWorker implementation, realized as MR mapper.
- * 
- * NOTE: In a cluster setup, reuse jvm will not lead to reusing jvms of different jobs or different
- * task types due to job-level specification of jvm max sizes for map/reduce 
- *
- */
-public class RemoteParWorkerMapper extends ParWorker  //MapReduceBase not required (no op implementations of configure, close)
-	implements Mapper<LongWritable, Text, Writable, Writable>
-{
-	
-	//cache for future reuse (in case of JVM reuse)
-	//NOTE: Hashmap to support multiple parfor MR jobs for local mode and if JVM reuse across jobs
-	private static HashMap<String,RemoteParWorkerMapper> _sCache = null; 
-	
-	//MR ParWorker attributes  
-	protected String  _stringID       = null; 
-	protected HashMap<String, String> _rvarFnames = null; 
-	
-	static
-	{
-		//init cache (once per JVM)
-		_sCache = new HashMap<String, RemoteParWorkerMapper>();
-	}
-	
-	
-	public RemoteParWorkerMapper( ) 
-	{
-		//only used if JVM reuse is enabled in order to ensure consistent output 
-		//filenames across tasks of one reused worker (preaggregation)
-		_rvarFnames = new HashMap<String, String>();
-	}
-	
-	/**
-	 * 
-	 */
-	@Override
-	public void map(LongWritable key, Text value, OutputCollector<Writable, Writable> out, Reporter reporter) 
-		throws IOException
-	{
-		LOG.trace("execute RemoteParWorkerMapper "+_stringID+" ("+_workerID+")");
-		
-		//state for jvm reuse and multiple iterations 
-		long numIters = getExecutedIterations(); 
-		
-		try 
-		{
-			//parse input task
-			Task lTask = Task.parseCompactString( value.toString() );
-			
-			//execute task (on error: re-try via Hadoop)
-			executeTask( lTask );
-		
-			//write output if required (matrix indexed write)
-			RemoteParForUtils.exportResultVariables( _workerID, _ec.getVariables(), _resultVars, _rvarFnames, out );
-		}
-		catch(Exception ex)
-		{
-			//throw IO exception to adhere to API specification
-			throw new IOException("ParFOR: Failed to execute task.",ex);
-		}
-		
-		//statistic maintenance
-		RemoteParForUtils.incrementParForMRCounters(reporter, 1, getExecutedIterations()-numIters);
-		
-		//print heaver hitter per task
-		JobConf job = ConfigurationManager.getCachedJobConf();
-		if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
-			LOG.info("\nSystemML Statistics:\nHeavy hitter instructions (name, time, count):\n" + Statistics.getHeavyHitters(10));	
-	}
-
-	/**
-	 * 
-	 */
-	@Override
-	public void configure(JobConf job)
-	{
-		boolean requiresConfigure = true;
-		String jobID = job.get("mapred.job.id");
-		
-		//probe cache for existing worker (parfor body, symbol table, etc)
-		if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
-		{
-			synchronized( _sCache ) //for multiple jobs in local mode
-			{
-				if( _sCache.containsKey(jobID) )
-				{
-					RemoteParWorkerMapper tmp = _sCache.get(jobID);
-					
-					_stringID       = tmp._stringID;
-					_workerID       = tmp._workerID;
-					
-					_childBlocks    = tmp._childBlocks;
-					_resultVars     = tmp._resultVars;
-					_ec             = tmp._ec;
-					
-					_numIters       = tmp._numIters;
-					_numTasks       = tmp._numTasks;
-										
-					_rvarFnames     = tmp._rvarFnames;
-					
-					requiresConfigure = false;
-				}
-			}
-		}
-		
-		if( requiresConfigure )
-		{
-			LOG.trace("configure RemoteParWorkerMapper "+job.get("mapred.tip.id"));
-			
-			try
-			{
-				_stringID = job.get("mapred.tip.id"); //task ID
-				_workerID = IDHandler.extractIntID(_stringID); //int task ID
-
-				//use the given job configuration as source for all new job confs 
-				//NOTE: this is required because on HDP 2.3, the classpath of mr tasks contained hadoop-common.jar 
-				//which includes a core-default.xml configuration which hides the actual default cluster configuration
-				//in the context of mr jobs (for example this config points to local fs instead of hdfs by default). 
-				if( !InfrastructureAnalyzer.isLocalMode(job) ) {
-					ConfigurationManager.setCachedJobConf(job);
-				}
-				
-				//create local runtime program
-				String in = MRJobConfiguration.getProgramBlocks(job);
-				ParForBody body = ProgramConverter.parseParForBody(in, (int)_workerID);
-				_childBlocks = body.getChildBlocks();
-				_ec          = body.getEc();				
-				_resultVars  = body.getResultVarNames();
-		
-				//init local cache manager 
-				if( !CacheableData.isCachingActive() ) {
-					String uuid = IDHandler.createDistributedUniqueID();
-					LocalFileUtils.createWorkingDirectoryWithUUID( uuid );
-					CacheableData.initCaching( uuid ); //incl activation, cache dir creation (each map task gets its own dir for simplified cleanup)
-				}				
-				if( !CacheableData.cacheEvictionLocalFilePrefix.contains("_") ){ //account for local mode
-					CacheableData.cacheEvictionLocalFilePrefix = CacheableData.cacheEvictionLocalFilePrefix +"_" + _workerID; 
-				}
-				
-				//ensure that resultvar files are not removed
-				super.pinResultVariables();
-				
-				//enable/disable caching (if required)
-				boolean cpCaching = MRJobConfiguration.getParforCachingConfig( job );
-				if( !cpCaching )
-					CacheableData.disableCaching();
-		
-				_numTasks    = 0;
-				_numIters    = 0;
-				
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException(ex);
-			}
-			
-			//disable stat monitoring, reporting execution times via counters not useful 
-			StatisticMonitor.disableStatMonitoring();
-			
-			//put into cache if required
-			if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
-				synchronized( _sCache ){ //for multiple jobs in local mode
-					_sCache.put(jobID, this);
-				}
-		} 
-		else
-		{
-			LOG.trace("reuse configured RemoteParWorkerMapper "+_stringID);
-		}
-		
-		//always reset stats because counters per map task (for case of JVM reuse)
-		if( DMLScript.STATISTICS && !InfrastructureAnalyzer.isLocalMode(job) )
-		{
-			CacheStatistics.reset();
-			Statistics.reset();
-		}
-	}
-
-	/**
-	 * 
-	 */
-	@Override
-	public void close() 
-		throws IOException 
-	{
-		//cleanup cache and local tmp dir
-		RemoteParForUtils.cleanupWorkingDirectories();
-		
-		//change cache status for jvm_reuse (make empty allows us to
-		//reuse in-memory objects if still present, re-load from HDFS
-		//if evicted by garbage collector - without this approach, we
-		//could not cleanup the local working dir, because this would 
-		//delete evicted matrices as well. 
-		if( ParForProgramBlock.ALLOW_REUSE_MR_PAR_WORKER )
-		{
-			for( RemoteParWorkerMapper pw : _sCache.values() )
-			{
-				LocalVariableMap vars = pw._ec.getVariables();
-				for( String varName : vars.keySet() )
-				{
-					Data dat = vars.get(varName);
-					if( dat instanceof MatrixObject )
-						((MatrixObject)dat).setEmptyStatus();
-				}
-			}
-		}
-		
-		//ensure caching is not disabled for CP in local mode
-		CacheableData.enableCaching();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java
deleted file mode 100644
index c4ff3c5..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMerge.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-
-/**
- * Due to independence of all iterations, any result has the following properties:
- * (1) non local var, (2) matrix object, and (3) completely independent.
- * These properties allow us to realize result merging in parallel without any synchronization. 
- * 
- */
-public abstract class ResultMerge 
-{
-	
-	protected static final Log LOG = LogFactory.getLog(ResultMerge.class.getName());
-	
-	protected static final String NAME_SUFFIX = "_rm";
-	
-	//inputs to result merge
-	protected MatrixObject   _output      = null;
-	protected MatrixObject[] _inputs      = null; 
-	protected String         _outputFName = null;
-	
-	protected ResultMerge( )
-	{
-		
-	}
-	
-	public ResultMerge( MatrixObject out, MatrixObject[] in, String outputFilename )
-	{
-		_output = out;
-		_inputs = in;
-		_outputFName = outputFilename;
-	}
-	
-	/**
-	 * Merge all given input matrices sequentially into the given output matrix.
-	 * The required space in-memory is the size of the output matrix plus the size
-	 * of one input matrix at a time.
-	 * 
-	 * @return output (merged) matrix
-	 * @throws DMLRuntimeException
-	 */
-	public abstract MatrixObject executeSerialMerge() 
-		throws DMLRuntimeException;
-	
-	/**
-	 * Merge all given input matrices in parallel into the given output matrix.
-	 * The required space in-memory is the size of the output matrix plus the size
-	 * of all input matrices.
-	 * 
-	 * @param par degree of parallelism
-	 * @return output (merged) matrix
-	 * @throws DMLRuntimeException
-	 */
-	public abstract MatrixObject executeParallelMerge( int par ) 
-		throws DMLRuntimeException;
-	
-	/**
-	 * 
-	 * @param out initially empty block
-	 * @param in 
-	 * @param appendOnly 
-	 * @throws DMLRuntimeException 
-	 */
-	protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) 
-		throws DMLRuntimeException
-	{
-		//pass through to matrix block operations
-		out.merge(in, appendOnly);	
-	}
-
-	/**
-	 * NOTE: append only not applicable for wiht compare because output must be populated with
-	 * initial state of matrix - with append, this would result in duplicates.
-	 * 
-	 * @param out
-	 * @param in
-	 * @throws DMLRuntimeException 
-	 */
-	protected void mergeWithComp( MatrixBlock out, MatrixBlock in, double[][] compare ) 
-		throws DMLRuntimeException
-	{
-		//Notes for result correctness:
-		// * Always iterate over entire block in order to compare all values 
-		//   (using sparse iterator would miss values set to 0) 
-		// * Explicit NaN awareness because for cases were original matrix contains
-		//   NaNs, since NaN != NaN, otherwise we would potentially overwrite results
-		
-		if( in.isInSparseFormat() ) //sparse input format
-		{
-			int rows = in.getNumRows();
-			int cols = in.getNumColumns();
-			for( int i=0; i<rows; i++ )
-				for( int j=0; j<cols; j++ )
-				{	
-				    double value = in.getValueSparseUnsafe(i,j);  //input value
-					if(   (value != compare[i][j] && !Double.isNaN(value) )     //for new values only (div)
-						|| Double.isNaN(value) != Double.isNaN(compare[i][j]) ) //NaN awareness 
-					{
-				    	out.quickSetValue( i, j, value );	
-					}
-				}
-		}
-		else //dense input format
-		{
-			//for a merge this case will seldom happen, as each input MatrixObject
-			//has at most 1/numThreads of all values in it.
-			int rows = in.getNumRows();
-			int cols = in.getNumColumns();
-			for( int i=0; i<rows; i++ )
-				for( int j=0; j<cols; j++ )
-				{
-				    double value = in.getValueDenseUnsafe(i,j);  //input value
-				    if(    (value != compare[i][j] && !Double.isNaN(value) )    //for new values only (div)
-				    	|| Double.isNaN(value) != Double.isNaN(compare[i][j]) ) //NaN awareness
-				    {
-				    	out.quickSetValue( i, j, value );	
-				    }
-				}
-		}	
-	}
-
-	protected long computeNonZeros( MatrixObject out, ArrayList<MatrixObject> in )
-	{
-		MatrixCharacteristics mc = out.getMatrixCharacteristics();
-		long outNNZ = mc.getNonZeros();	
-		long ret = outNNZ;
-		for( MatrixObject tmp : in )
-		{
-			MatrixCharacteristics tmpmc = tmp.getMatrixCharacteristics();
-			long inNNZ = tmpmc.getNonZeros();
-			ret +=  (inNNZ - outNNZ);			
-		}
-		
-		return ret;
-	}
-	
-	/**
-	 * 
-	 * @param in
-	 * @return
-	 */
-	protected ArrayList<MatrixObject> convertToList(MatrixObject[] in)
-	{
-		ArrayList<MatrixObject> ret = new ArrayList<MatrixObject>();
-		for( MatrixObject mo : in )
-			ret.add( mo );
-		
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
deleted file mode 100644
index 745ae08..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalAutomatic.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-
-import com.ibm.bi.dml.hops.OptimizerUtils;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.opt.OptimizerRuleBased;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.Timing;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-
-/**
- * 
- */
-public class ResultMergeLocalAutomatic extends ResultMerge
-{
-	
-	private ResultMerge _rm = null;
-	
-	public ResultMergeLocalAutomatic( MatrixObject out, MatrixObject[] in, String outputFilename )
-	{
-		super( out, in, outputFilename );
-	}
-
-	@Override
-	public MatrixObject executeSerialMerge() 
-		throws DMLRuntimeException 
-	{
-		Timing time = new Timing(true);
-		
-		MatrixCharacteristics mc = _output.getMatrixCharacteristics();
-		long rows = mc.getRows();
-		long cols = mc.getCols();
-		
-		if( OptimizerRuleBased.isInMemoryResultMerge(rows, cols, OptimizerUtils.getLocalMemBudget()) )
-			_rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName );
-		else
-			_rm = new ResultMergeLocalFile( _output, _inputs, _outputFName );
-		
-		MatrixObject ret = _rm.executeSerialMerge();
-
-		LOG.trace("Automatic result merge ("+_rm.getClass().getName()+") executed in "+time.stop()+"ms.");
-
-		return ret;
-	}
-	
-	@Override
-	public MatrixObject executeParallelMerge(int par) 
-		throws DMLRuntimeException 
-	{
-		MatrixCharacteristics mc = _output.getMatrixCharacteristics();
-		long rows = mc.getRows();
-		long cols = mc.getCols();
-		
-		if( OptimizerRuleBased.isInMemoryResultMerge(par * rows, cols, OptimizerUtils.getLocalMemBudget()) )
-			_rm = new ResultMergeLocalMemory( _output, _inputs, _outputFName );
-		else
-			_rm = new ResultMergeLocalFile( _output, _inputs, _outputFName );
-		
-		return _rm.executeParallelMerge(par);	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
deleted file mode 100644
index 8020719..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalFile.java
+++ /dev/null
@@ -1,1174 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-
-import com.ibm.bi.dml.conf.ConfigurationManager;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.CacheException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.Cell;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.IDSequence;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import com.ibm.bi.dml.runtime.io.MatrixReader;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.IJV;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.SparseRowsIterator;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-import com.ibm.bi.dml.runtime.util.FastStringTokenizer;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-
-/**
- * 
- * TODO potential extension: parallel merge (create individual staging files concurrently)
- *     
- *      NOTE: file merge typically used due to memory constraints - parallel merge would increase the memory
- *      consumption again.
- */
-public class ResultMergeLocalFile extends ResultMerge
-{
-	
-	//NOTE: if we allow simple copies, this might result in a scattered file and many MR tasks for subsequent jobs
-	public static final boolean ALLOW_COPY_CELLFILES = false;	
-	
-	//internal comparison matrix
-	private IDSequence _seq = null;
-	
-	public ResultMergeLocalFile( MatrixObject out, MatrixObject[] in, String outputFilename )
-	{
-		super( out, in, outputFilename );
-		
-		_seq = new IDSequence();
-	}
-
-
-	@Override
-	public MatrixObject executeSerialMerge() 
-		throws DMLRuntimeException 
-	{
-		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
-		//Timing time = null;
-		LOG.trace("ResultMerge (local, file): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-		//	time = new Timing();
-		//	time.start();
-
-		
-		try
-		{
-			
-			
-			//collect all relevant inputs
-			ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
-			for( MatrixObject in : _inputs )
-			{
-				//check for empty inputs (no iterations executed)
-				if( in !=null && in != _output ) 
-				{
-					//ensure that input file resides on disk
-					in.exportData();
-					
-					//add to merge list
-					inMO.add( in );
-				}
-			}
-
-			if( !inMO.isEmpty() )
-			{
-				//ensure that outputfile (for comparison) resides on disk
-				_output.exportData();
-				
-				//actual merge
-				merge( _outputFName, _output, inMO );
-				
-				//create new output matrix (e.g., to prevent potential export<->read file access conflict
-				moNew = createNewMatrixObject( _output, inMO );	
-			}
-			else
-			{
-				moNew = _output; //return old matrix, to prevent copy
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-
-		//LOG.trace("ResultMerge (local, file): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-		
-		return moNew;
-	}
-	
-	@Override
-	public MatrixObject executeParallelMerge(int par) 
-		throws DMLRuntimeException 
-	{
-		//graceful degradation to serial merge
-		return executeSerialMerge();
-	}
-
-	/**
-	 * 
-	 * @param output
-	 * @param inMO
-	 * @return
-	 * @throws DMLRuntimeException
-	 */
-	private MatrixObject createNewMatrixObject(MatrixObject output, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		String varName = _output.getVarName();
-		ValueType vt = _output.getValueType();
-		MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
-		
-		MatrixObject moNew = new MatrixObject( vt, _outputFName );
-		moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
-		moNew.setDataType( DataType.MATRIX );
-		
-		//create deep copy of metadata obj
-		MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
-		OutputInfo oiOld = metadata.getOutputInfo();
-		InputInfo iiOld = metadata.getInputInfo();
-		MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-				                                             mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-		mc.setNonZeros( computeNonZeros(output, inMO) );
-		MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
-		moNew.setMetaData( meta );
-		
-		return moNew;
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	private void merge( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		OutputInfo oi = ((MatrixFormatMetaData)outMo.getMetaData()).getOutputInfo();
-		boolean withCompare = ( outMo.getNnz() != 0 ); //if nnz exist or unknown (-1)
-		
-		if( oi == OutputInfo.TextCellOutputInfo )
-		{
-			if(withCompare)
-				mergeTextCellWithComp(fnameNew, outMo, inMO);
-			else
-				mergeTextCellWithoutComp( fnameNew, outMo, inMO );
-		}
-		else if( oi == OutputInfo.BinaryCellOutputInfo )
-		{
-			if(withCompare)
-				mergeBinaryCellWithComp(fnameNew, outMo, inMO);
-			else
-				mergeBinaryCellWithoutComp( fnameNew, outMo, inMO );
-		}
-		else if( oi == OutputInfo.BinaryBlockOutputInfo )
-		{
-			if(withCompare)
-				mergeBinaryBlockWithComp( fnameNew, outMo, inMO );
-			else
-				mergeBinaryBlockWithoutComp( fnameNew, outMo, inMO );
-		}
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	private void mergeTextCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		try
-		{
-			//delete target file if already exists
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-			
-			if( ALLOW_COPY_CELLFILES )
-			{
-				copyAllFiles(fnameNew, inMO);
-				return; //we're done
-			}
-			
-			//actual merge
-			JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-			FileSystem fs = FileSystem.get(job);
-			Path path = new Path( fnameNew );
-			BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
-			
-			String valueStr = null;
-			
-			try
-			{
-				for( MatrixObject in : inMO ) //read/write all inputs
-				{
-					LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via stream merge");
-					
-					JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
-					Path tmpPath = new Path(in.getFileName());
-					FileInputFormat.addInputPath(tmpJob, tmpPath);
-					TextInputFormat informat = new TextInputFormat();
-					informat.configure(tmpJob);
-					InputSplit[] splits = informat.getSplits(tmpJob, 1);
-					
-					LongWritable key = new LongWritable();
-					Text value = new Text();
-		
-					for(InputSplit split: splits)
-					{
-						RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, tmpJob, Reporter.NULL);
-						try
-						{
-							while(reader.next(key, value))
-							{
-								valueStr = value.toString().trim();	
-								out.write( valueStr+"\n" );
-							}
-						}
-						finally
-						{
-							if( reader != null )
-								reader.close();
-						}
-					}
-				}
-			}
-			finally
-			{
-				if( out != null )
-					out.close();
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Unable to merge text cell results.", ex);
-		}
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	private void mergeTextCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		
-		try
-		{
-			//delete target file if already exists
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-			
-			//Step 0) write compare blocks to staging area (if necessary)
-			LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "+outMo.getVarName()+" (fname="+outMo.getFileName()+")");
-			createTextCellStagingFile(fnameStagingCompare, outMo, 0);
-			
-			//Step 1) read and write blocks to staging area
-			for( MatrixObject in : inMO )
-			{
-				LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-				
-				long ID = _seq.getNextID();
-				createTextCellStagingFile( fnameStaging, in, ID );
-			}
-	
-			//Step 2) read blocks, consolidate, and write to HDFS
-			createTextCellResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), true);
-		}	
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Unable to merge text cell results.", ex);
-		}
-		
-		LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
-		LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void mergeBinaryCellWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		try
-		{	
-			//delete target file if already exists
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-			
-			if( ALLOW_COPY_CELLFILES )
-			{
-				copyAllFiles(fnameNew, inMO);
-				return; //we're done
-			}
-			
-			//actual merge
-			JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-			FileSystem fs = FileSystem.get(job);
-			Path path = new Path( fnameNew );					
-			SequenceFile.Writer out = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-			
-			MatrixIndexes key = new MatrixIndexes();
-			MatrixCell value = new MatrixCell();
-			
-			try
-			{
-				for( MatrixObject in : inMO ) //read/write all inputs
-				{
-					LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via stream merge");
-					
-					JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
-					Path tmpPath = new Path(in.getFileName());
-					
-					for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath) )
-					{
-						SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
-						try
-						{
-							while(reader.next(key, value))
-							{
-								out.append(key, value);
-							}
-						}
-						finally
-						{
-							if( reader != null )
-								reader.close();
-						}
-					}					
-				}	
-			}
-			finally
-			{
-				if( out != null )
-					out.close();
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Unable to merge binary cell results.", ex);
-		}	
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	private void mergeBinaryCellWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		
-		try
-		{
-			//delete target file if already exists
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-			
-			//Step 0) write compare blocks to staging area (if necessary)
-			LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "+outMo.getVarName()+" (fname="+outMo.getFileName()+")");
-			createBinaryCellStagingFile(fnameStagingCompare, outMo, 0);
-			
-			//Step 1) read and write blocks to staging area
-			for( MatrixObject in : inMO )
-			{
-				LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-				
-				long ID = _seq.getNextID();
-				createBinaryCellStagingFile( fnameStaging, in, ID );
-			}
-	
-			//Step 2) read blocks, consolidate, and write to HDFS
-			createBinaryCellResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), true);
-		}	
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Unable to merge binary cell results.", ex);
-		}
-		
-		LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
-		LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	private void mergeBinaryBlockWithoutComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		
-		try
-		{
-			//delete target file if already exists
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-			
-			//Step 1) read and write blocks to staging area
-			for( MatrixObject in : inMO )
-			{
-				LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");				
-				
-				createBinaryBlockStagingFile( fnameStaging, in );
-			}
-	
-			//Step 2) read blocks, consolidate, and write to HDFS
-			createBinaryBlockResultFile(fnameStaging, null, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), false);
-		}	
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Unable to merge binary block results.", ex);
-		}	
-		
-		LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
-	}
-	
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param outMo
-	 * @param inMO
-	 * @throws DMLRuntimeException
-	 */
-	private void mergeBinaryBlockWithComp( String fnameNew, MatrixObject outMo, ArrayList<MatrixObject> inMO ) 
-		throws DMLRuntimeException
-	{
-		String fnameStaging = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		String fnameStagingCompare = LocalFileUtils.getUniqueWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE);
-		
-		try
-		{
-			//delete target file if already exists
-			MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-			
-			//Step 0) write compare blocks to staging area (if necessary)
-			LOG.trace("ResultMerge (local, file): Create merge compare matrix for output "+outMo.getVarName()+" (fname="+outMo.getFileName()+")");			
-			
-			createBinaryBlockStagingFile(fnameStagingCompare, outMo);
-			
-			//Step 1) read and write blocks to staging area
-			for( MatrixObject in : inMO )
-			{
-				LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");		
-				createBinaryBlockStagingFile( fnameStaging, in );
-			}
-	
-			//Step 2) read blocks, consolidate, and write to HDFS
-			createBinaryBlockResultFile(fnameStaging, fnameStagingCompare, fnameNew, (MatrixFormatMetaData)outMo.getMetaData(), true);
-		}	
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException("Unable to merge binary block results.", ex);
-		}	
-		
-		LocalFileUtils.cleanupWorkingDirectory(fnameStaging);
-		LocalFileUtils.cleanupWorkingDirectory(fnameStagingCompare);
-	}
-	
-	/**
-	 * 
-	 * @param fnameStaging
-	 * @param mo
-	 * @throws IOException
-	 */
-	@SuppressWarnings("deprecation")
-	private void createBinaryBlockStagingFile( String fnameStaging, MatrixObject mo ) 
-		throws IOException
-	{		
-		MatrixIndexes key = new MatrixIndexes(); 
-		MatrixBlock value = new MatrixBlock();
-		
-		JobConf tmpJob = new JobConf(ConfigurationManager.getCachedJobConf());
-		FileSystem fs = FileSystem.get(tmpJob);
-		Path tmpPath = new Path(mo.getFileName());
-		
-		for(Path lpath : MatrixReader.getSequenceFilePaths(fs, tmpPath))
-		{
-			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,tmpJob);
-			try
-			{
-				while(reader.next(key, value)) //for each block
-				{							
-					String lname = key.getRowIndex()+"_"+key.getColumnIndex();
-					String dir = fnameStaging+"/"+lname;
-					if( value.getNonZeros()>0 ) //write only non-empty blocks
-					{
-						LocalFileUtils.checkAndCreateStagingDir( dir );
-						LocalFileUtils.writeMatrixBlockToLocal(dir+"/"+_seq.getNextID(), value);
-					}
-				}
-			}
-			finally
-			{
-				if( reader != null )
-					reader.close();
-			}
-		}
-	}
-	
-	/**
-	 * 
-	 * @param fnameStaging
-	 * @param mo
-	 * @param ID
-	 * @throws IOException
-	 * @throws DMLRuntimeException
-	 */
-	
-	private void createTextCellStagingFile( String fnameStaging, MatrixObject mo, long ID ) 
-		throws IOException, DMLRuntimeException
-	{		
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-		Path path = new Path(mo.getFileName());
-		FileInputFormat.addInputPath(job, path);
-		TextInputFormat informat = new TextInputFormat();
-		informat.configure(job);
-		InputSplit[] splits = informat.getSplits(job, 1);
-		
-		LinkedList<Cell> buffer = new LinkedList<Cell>();
-		LongWritable key = new LongWritable();
-		Text value = new Text();
-
-		MatrixCharacteristics mc = mo.getMatrixCharacteristics();
-		int brlen = mc.getRowsPerBlock(); 
-		int bclen = mc.getColsPerBlock();
-		//long row = -1, col = -1; //FIXME needs reconsideration whenever textcell is used actively
-		//NOTE MB: Originally, we used long row, col but this led reproducibly to JIT compilation
-		// errors during runtime; experienced under WINDOWS, Intel x86-64, IBM JDK 64bit/32bit.
-		// It works fine with int row, col but we require long for larger matrices.
-		// Since, textcell is never used for result merge (hybrid/hadoop: binaryblock, singlenode:binarycell)
-		// we just propose the to exclude it with -Xjit:exclude={package.method*}(count=0,optLevel=0)
-		
-		FastStringTokenizer st = new FastStringTokenizer(' ');
-		
-		for(InputSplit split : splits)
-		{
-			RecordReader<LongWritable,Text> reader = informat.getRecordReader(split, job, Reporter.NULL);
-			try
-			{
-				while(reader.next(key, value))
-				{
-					st.reset( value.toString() ); //reset tokenizer
-					long row = st.nextLong();
-				    long col = st.nextLong();
-					double lvalue = Double.parseDouble( st.nextToken() );
-					
-					Cell tmp = new Cell( row, col, lvalue ); 
-					
-					buffer.addLast( tmp );
-					if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
-					{
-						appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
-						buffer.clear();
-					}
-				}
-				
-				//final flush
-				if( !buffer.isEmpty() )
-				{
-					appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
-					buffer.clear();
-				}
-			}
-			finally
-			{
-				if( reader != null )
-					reader.close();
-			}
-		}
-	}
-	
-	/**
-	 * 
-	 * @param fnameStaging
-	 * @param mo
-	 * @param ID
-	 * @throws IOException
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void createBinaryCellStagingFile( String fnameStaging, MatrixObject mo, long ID ) 
-		throws IOException, DMLRuntimeException
-	{		
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-		Path path = new Path(mo.getFileName());
-		FileSystem fs = FileSystem.get(job);
-		
-		LinkedList<Cell> buffer = new LinkedList<Cell>();
-		MatrixIndexes key = new MatrixIndexes();
-		MatrixCell value = new MatrixCell();
-	
-		MatrixCharacteristics mc = mo.getMatrixCharacteristics();
-		int brlen = mc.getRowsPerBlock();
-		int bclen = mc.getColsPerBlock();
-		
-		for(Path lpath: MatrixReader.getSequenceFilePaths(fs, path))
-		{
-			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
-			try
-			{
-				while(reader.next(key, value))
-				{
-					Cell tmp = new Cell( key.getRowIndex(), key.getColumnIndex(), value.getValue() ); 
-	
-					buffer.addLast( tmp );
-					if( buffer.size() > StagingFileUtils.CELL_BUFFER_SIZE ) //periodic flush
-					{
-						appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
-						buffer.clear();
-					}
-				}
-				
-				//final flush
-				if( !buffer.isEmpty() )
-				{
-					appendCellBufferToStagingArea(fnameStaging, ID, buffer, brlen, bclen);
-					buffer.clear();
-				}
-			}
-			finally
-			{
-				if( reader != null )
-					reader.close();
-			}
-		}
-	}
-	
-	/**
-	 * @param fnameStaging
-	 * @param ID
-	 * @param buffer
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 * @throws IOException
-	 */
-	private void appendCellBufferToStagingArea( String fnameStaging, long ID, LinkedList<Cell> buffer, int brlen, int bclen ) 
-		throws DMLRuntimeException, IOException
-	{
-		HashMap<Long,HashMap<Long,LinkedList<Cell>>> sortedBuffer = new HashMap<Long, HashMap<Long,LinkedList<Cell>>>();
-		long brow, bcol, row_offset, col_offset;
-		
-		for( Cell c : buffer )
-		{
-			brow = (c.getRow()-1)/brlen + 1;
-			bcol = (c.getCol()-1)/bclen + 1;
-			row_offset = (brow-1)*brlen + 1;
-			col_offset = (bcol-1)*bclen + 1;
-			
-			c.setRow( c.getRow() - row_offset);
-			c.setCol(c.getCol() - col_offset);
-			
-			if( !sortedBuffer.containsKey(brow) )
-				sortedBuffer.put(brow, new HashMap<Long,LinkedList<Cell>>());
-			if( !sortedBuffer.get(brow).containsKey(bcol) )
-				sortedBuffer.get(brow).put(bcol, new LinkedList<Cell>());
-			sortedBuffer.get(brow).get(bcol).addLast(c);
-		}	
-		
-		//write lists of cells to local files
-		for( Entry<Long,HashMap<Long,LinkedList<Cell>>> e : sortedBuffer.entrySet() )
-		{
-			brow = e.getKey();
-			for( Entry<Long,LinkedList<Cell>> e2 : e.getValue().entrySet() )
-			{
-				bcol = e2.getKey();
-				String lname = brow+"_"+bcol;
-				String dir = fnameStaging+"/"+lname;
-				LocalFileUtils.checkAndCreateStagingDir( dir );
-				StagingFileUtils.writeCellListToLocal(dir+"/"+ID, e2.getValue());
-			}
-		}
-	}	
-	
-	/**
-	 * 
-	 * @param fnameStaging
-	 * @param fnameStagingCompare
-	 * @param fnameNew
-	 * @param metadata
-	 * @param withCompare
-	 * @throws IOException
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void createBinaryBlockResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MatrixFormatMetaData metadata, boolean withCompare ) 
-		throws IOException, DMLRuntimeException
-	{
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-		FileSystem fs = FileSystem.get(job);
-		Path path = new Path( fnameNew );	
-		
-		MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
-		long rlen = mc.getRows();
-		long clen = mc.getCols();
-		int brlen = mc.getRowsPerBlock();
-		int bclen = mc.getColsPerBlock();
-		
-		SequenceFile.Writer writer = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixBlock.class); //beware ca 50ms
-		try
-		{
-			MatrixIndexes indexes = new MatrixIndexes();
-			for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)brlen); brow++)
-				for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)bclen); bcol++)
-				{
-					File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
-					File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
-					MatrixBlock mb = null;
-					
-					if( dir.exists() )
-					{
-						if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
-						{
-							//copy only values that are different from the original
-							String[] lnames2 = dir2.list();
-							if( lnames2.length != 1 ) //there should be exactly 1 compare block
-								throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
-							mb = LocalFileUtils.readMatrixBlockFromLocal( dir2+"/"+lnames2[0] );
-							boolean appendOnly = mb.isInSparseFormat();
-							double[][] compare = DataConverter.convertToDoubleMatrix(mb);
-							
-							String[] lnames = dir.list();
-							for( String lname : lnames )
-							{
-								MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
-								mergeWithComp(mb, tmp, compare);
-							}
-							
-							//sort sparse due to append-only
-							if( appendOnly )
-								mb.sortSparseRows();
-							
-							//change sparsity if required after 
-							mb.examSparsity(); 
-						}
-						else //WITHOUT COMPARE BLOCK
-						{
-							//copy all non-zeros from all workers
-							String[] lnames = dir.list();
-							boolean appendOnly = false;
-							for( String lname : lnames )
-							{
-								if( mb == null )
-								{
-									mb = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
-									appendOnly = mb.isInSparseFormat();
-								}
-								else
-								{
-									MatrixBlock tmp = LocalFileUtils.readMatrixBlockFromLocal( dir+"/"+lname );
-									mergeWithoutComp(mb, tmp, appendOnly);
-								}
-							}	
-							
-							//sort sparse due to append-only
-							if( appendOnly )
-								mb.sortSparseRows();
-							
-							//change sparsity if required after 
-							mb.examSparsity(); 
-						}
-					}
-					else
-					{
-						//NOTE: whenever runtime does not need all blocks anymore, this can be removed
-						int maxRow = (int)(((brow-1)*brlen + brlen < rlen) ? brlen : rlen - (brow-1)*brlen);
-						int maxCol = (int)(((bcol-1)*bclen + bclen < clen) ? bclen : clen - (bcol-1)*bclen);
-				
-						mb = new MatrixBlock(maxRow, maxCol, true);
-					}	
-					
-					//mb.examSparsity(); //done on write anyway and mb not reused
-					indexes.setIndexes(brow, bcol);
-					writer.append(indexes, mb);
-				}	
-		}
-		finally
-		{
-			if( writer != null )
-				writer.close();
-		}
-	}
-	
-	/**
-	 * 
-	 * @param fnameStaging
-	 * @param fnameStagingCompare
-	 * @param fnameNew
-	 * @param metadata
-	 * @param withCompare
-	 * @throws IOException
-	 * @throws DMLRuntimeException
-	 */
-	private void createTextCellResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MatrixFormatMetaData metadata, boolean withCompare ) 
-		throws IOException, DMLRuntimeException
-	{
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-		FileSystem fs = FileSystem.get(job);
-		Path path = new Path( fnameNew );	
-		
-		MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
-		long rlen = mc.getRows();
-		long clen = mc.getCols();
-		int brlen = mc.getRowsPerBlock();
-		int bclen = mc.getColsPerBlock();
-				
-		BufferedWriter out = new BufferedWriter(new OutputStreamWriter(fs.create(path,true)));		
-		try
-		{
-			//for obj reuse and preventing repeated buffer re-allocations
-			StringBuilder sb = new StringBuilder();
-			
-			boolean written=false;
-			for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)brlen); brow++)
-				for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)bclen); bcol++)
-				{
-					File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
-					File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
-					MatrixBlock mb = null;
-					
-					long row_offset = (brow-1)*brlen + 1;
-					long col_offset = (bcol-1)*bclen + 1;
-					
-					
-					if( dir.exists() )
-					{
-						if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
-						{
-							//copy only values that are different from the original
-							String[] lnames2 = dir2.list();
-							if( lnames2.length != 1 ) //there should be exactly 1 compare block
-								throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
-							mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen );
-							boolean appendOnly = mb.isInSparseFormat();
-							double[][] compare = DataConverter.convertToDoubleMatrix(mb);
-							
-							String[] lnames = dir.list();
-							for( String lname : lnames )
-							{
-								MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, brlen, bclen );
-								mergeWithComp(mb, tmp, compare);
-							}
-							
-							//sort sparse and exam sparsity due to append-only
-							if( appendOnly )
-								mb.sortSparseRows();
-							
-							//change sparsity if required after 
-							mb.examSparsity(); 
-						}
-						else //WITHOUT COMPARE BLOCK
-						{
-							//copy all non-zeros from all workers
-							String[] lnames = dir.list();
-							boolean appendOnly = false;
-							for( String lname : lnames )
-							{
-								if( mb == null )
-								{
-									mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
-									appendOnly = mb.isInSparseFormat();
-								}
-								else
-								{
-									MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, brlen, bclen );
-									mergeWithoutComp(mb, tmp, appendOnly);
-								}
-							}	
-							
-							//sort sparse due to append-only
-							if( appendOnly )
-								mb.sortSparseRows();
-							
-							//change sparsity if required after 
-							mb.examSparsity(); 
-						}
-					}
-
-					//write the block to text cell
-					if( mb!=null )
-					{
-						if( mb.isInSparseFormat() )
-						{
-							SparseRowsIterator iter = mb.getSparseRowsIterator();
-							while( iter.hasNext() )
-							{
-								IJV lcell = iter.next();
-								sb.append(row_offset+lcell.i);
-								sb.append(' ');
-								sb.append(col_offset+lcell.j);
-								sb.append(' ');
-								sb.append(lcell.v);
-								sb.append('\n');
-								out.write( sb.toString() ); 
-								sb.setLength(0);
-								written = true;
-							}							
-						}
-						else
-						{
-							for( int i=0; i<brlen; i++ )
-								for( int j=0; j<bclen; j++ )
-								{
-									double lvalue = mb.getValueDenseUnsafe(i, j);
-									if( lvalue != 0 ) //for nnz
-									{
-										sb.append(row_offset+i);
-										sb.append(' ');
-										sb.append(col_offset+j);
-										sb.append(' ');
-										sb.append(lvalue);
-										sb.append('\n');
-										out.write( sb.toString() ); 
-										sb.setLength(0);
-										written = true;
-									}
-								}
-						}
-					}				
-				}	
-			
-			if( !written )
-				out.write("1 1 0\n");
-		}
-		finally
-		{
-			if( out != null )
-				out.close();
-		}
-	}
-
-	/**
-	 * 
-	 * @param fnameStaging
-	 * @param fnameStagingCompare
-	 * @param fnameNew
-	 * @param metadata
-	 * @param withCompare
-	 * @throws IOException
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings("deprecation")
-	private void createBinaryCellResultFile( String fnameStaging, String fnameStagingCompare, String fnameNew, MatrixFormatMetaData metadata, boolean withCompare ) 
-		throws IOException, DMLRuntimeException
-	{
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-		FileSystem fs = FileSystem.get(job);
-		Path path = new Path( fnameNew );	
-		
-		MatrixCharacteristics mc = metadata.getMatrixCharacteristics();
-		long rlen = mc.getRows();
-		long clen = mc.getCols();
-		int brlen = mc.getRowsPerBlock();
-		int bclen = mc.getColsPerBlock();
-				
-		
-		MatrixIndexes indexes = new MatrixIndexes(1,1);
-		MatrixCell cell = new MatrixCell(0);	
-		
-		SequenceFile.Writer out = new SequenceFile.Writer(fs, job, path, MatrixIndexes.class, MatrixCell.class); //beware ca 50ms
-		try
-		{
-			boolean written=false;
-			for(long brow = 1; brow <= (long)Math.ceil(rlen/(double)brlen); brow++)
-				for(long bcol = 1; bcol <= (long)Math.ceil(clen/(double)bclen); bcol++)
-				{
-					File dir = new File(fnameStaging+"/"+brow+"_"+bcol);
-					File dir2 = new File(fnameStagingCompare+"/"+brow+"_"+bcol);
-					MatrixBlock mb = null;
-					
-					long row_offset = (brow-1)*brlen + 1;
-					long col_offset = (bcol-1)*bclen + 1;
-					
-					
-					if( dir.exists() )
-					{
-						if( withCompare && dir2.exists() ) //WITH COMPARE BLOCK
-						{
-							//copy only values that are different from the original
-							String[] lnames2 = dir2.list();
-							if( lnames2.length != 1 ) //there should be exactly 1 compare block
-								throw new DMLRuntimeException("Unable to merge results because multiple compare blocks found.");
-							mb = StagingFileUtils.readCellList2BlockFromLocal( dir2+"/"+lnames2[0], brlen, bclen );
-							boolean appendOnly = mb.isInSparseFormat();
-							double[][] compare = DataConverter.convertToDoubleMatrix(mb);
-							
-							String[] lnames = dir.list();
-							for( String lname : lnames )
-							{
-								MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, brlen, bclen );
-								mergeWithComp(mb, tmp, compare);
-							}
-							
-							//sort sparse due to append-only
-							if( appendOnly )
-								mb.sortSparseRows();
-							
-							//change sparsity if required after 
-							mb.examSparsity(); 
-						}
-						else //WITHOUT COMPARE BLOCK
-						{
-							//copy all non-zeros from all workers
-							String[] lnames = dir.list();
-							boolean appendOnly = false;
-							for( String lname : lnames )
-							{
-								if( mb == null )
-								{
-									mb = StagingFileUtils.readCellList2BlockFromLocal( dir+"/"+lname, brlen, bclen );
-									appendOnly = mb.isInSparseFormat();
-								}
-								else
-								{
-									MatrixBlock tmp = StagingFileUtils.readCellList2BlockFromLocal(  dir+"/"+lname, brlen, bclen );
-									mergeWithoutComp(mb, tmp, appendOnly);
-								}
-							}	
-							
-							//sort sparse due to append-only
-							if( appendOnly )
-								mb.sortSparseRows();
-							
-							//change sparsity if required after 
-							mb.examSparsity(); 
-						}
-					}
-					
-					//write the block to binary cell
-					if( mb!=null )
-					{
-						if( mb.isInSparseFormat() )
-						{
-							SparseRowsIterator iter = mb.getSparseRowsIterator();
-							while( iter.hasNext() )
-							{
-								IJV lcell = iter.next();
-								indexes.setIndexes(row_offset+lcell.i, col_offset+lcell.j);
-								cell.setValue(lcell.v);
-								out.append(indexes,cell);
-								written = true;
-							}
-						}
-						else
-						{
-							for( int i=0; i<brlen; i++ )
-								for( int j=0; j<bclen; j++ )
-								{
-									double lvalue = mb.getValueDenseUnsafe(i, j);
-									if( lvalue != 0 ) //for nnz
-									{
-										indexes.setIndexes(row_offset+i, col_offset+j);
-										cell.setValue(lvalue);
-										out.append(indexes,cell);
-										written = true;
-									}
-								}
-						}
-					}				
-				}	
-			
-			if( !written )
-				out.append(indexes,cell);
-		}
-		finally
-		{
-			if( out != null )
-				out.close();
-		}
-	}
-
-	/**
-	 * 
-	 * @param fnameNew
-	 * @param inMO
-	 * @throws CacheException
-	 * @throws IOException
-	 */
-	private void copyAllFiles( String fnameNew, ArrayList<MatrixObject> inMO ) 
-		throws CacheException, IOException
-	{
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
-		FileSystem fs = FileSystem.get(job);
-		Path path = new Path( fnameNew );
-
-		//create output dir
-		fs.mkdirs(path);
-		
-		//merge in all input matrix objects
-		IDSequence seq = new IDSequence();
-		for( MatrixObject in : inMO )
-		{			
-			LOG.trace("ResultMerge (local, file): Merge input "+in.getVarName()+" (fname="+in.getFileName()+") via file rename.");
-			
-			//copy over files (just rename file or entire dir)
-			Path tmpPath = new Path(in.getFileName());
-			String lname = tmpPath.getName();
-			fs.rename(tmpPath, new Path(fnameNew+"/"+lname+seq.getNextID()));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
deleted file mode 100644
index 77b3bb5..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.util.DataConverter;
-
-/**
- * Local in-memory realization of result merge. If the resulting matrix is
- * small enough to fit into the JVM memory, this class can be used for efficient 
- * serial or multi-threaded merge.
- * 
- * 
- */
-public class ResultMergeLocalMemory extends ResultMerge
-{	
-	
-	//internal comparison matrix
-	private double[][]        _compare     = null;
-	
-	public ResultMergeLocalMemory( MatrixObject out, MatrixObject[] in, String outputFilename )
-	{
-		super( out, in, outputFilename );
-	}
-	
-	@Override
-	public MatrixObject executeSerialMerge() 
-		throws DMLRuntimeException
-	{
-		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
-		LOG.trace("ResultMerge (local, in-memory): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-				
-		try
-		{
-			//get matrix blocks through caching 
-			MatrixBlock outMB = _output.acquireRead();
-			
-			//get old output matrix from cache for compare
-			int estnnz = outMB.getNumRows()*outMB.getNumColumns();
-			MatrixBlock outMBNew = new MatrixBlock(outMB.getNumRows(), outMB.getNumColumns(), 
-					                               outMB.isInSparseFormat(), estnnz);
-			boolean appendOnly = outMBNew.isInSparseFormat();
-			
-			//create compare matrix if required (existing data in result)
-			_compare = createCompareMatrix(outMB);
-			if( _compare != null )
-				outMBNew.copy(outMB);
-			
-			//serial merge all inputs
-			boolean flagMerged = false;
-			for( MatrixObject in : _inputs )
-			{
-				//check for empty inputs (no iterations executed)
-				if( in !=null && in != _output ) 
-				{
-					LOG.trace("ResultMerge (local, in-memory): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
-					
-					//read/pin input_i
-					MatrixBlock inMB = in.acquireRead();	
-					
-					//core merge 
-					merge( outMBNew, inMB, appendOnly );
-					
-					//unpin and clear in-memory input_i
-					in.release();
-					in.clearData();
-					flagMerged = true;
-					
-					//determine need for sparse2dense change during merge
-					boolean sparseToDense = appendOnly && !MatrixBlock.evalSparseFormatInMemory(
-							                                 outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); 
-					if( sparseToDense ) {
-						outMBNew.sortSparseRows(); //sort sparse due to append-only
-						outMBNew.examSparsity(); //sparse-dense representation change
-						appendOnly = false; //change merge state for subsequent inputs
-					}
-				}
-			}
-		
-			//sort sparse due to append-only
-			if( appendOnly )
-				outMBNew.sortSparseRows();
-			
-			//change sparsity if required after 
-			outMBNew.examSparsity(); 
-			
-			//create output
-			if( flagMerged )
-			{		
-				//create new output matrix 
-				//(e.g., to prevent potential export<->read file access conflict in specific cases of 
-				// local-remote nested parfor))
-				moNew = createNewMatrixObject( outMBNew );	
-			}
-			else
-			{
-				moNew = _output; //return old matrix, to prevent copy
-			}
-			
-			//release old output, and all inputs
-			_output.release();
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-
-		//LOG.trace("ResultMerge (local, in-memory): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-		
-		return moNew;
-	}
-	
-	@Override
-	public MatrixObject executeParallelMerge( int par ) 
-		throws DMLRuntimeException
-	{		
-		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-	
-		//Timing time = null;
-		LOG.trace("ResultMerge (local, in-memory): Execute parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-		//	time = new Timing();
-		//	time.start();
-		
-
-		try
-		{
-			//get matrix blocks through caching 
-			MatrixBlock outMB = _output.acquireRead();
-			ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
-			for( MatrixObject in : _inputs )
-			{
-				//check for empty inputs (no iterations executed)
-				if( in !=null && in != _output ) 
-					inMO.add( in );
-			}
-			
-			if( !inMO.isEmpty() ) //if there exist something to merge
-			{
-				//get old output matrix from cache for compare
-				//NOTE: always in dense representation in order to allow for parallel unsynchronized access 
-				long rows = outMB.getNumRows();
-				long cols = outMB.getNumColumns();
-				MatrixBlock outMBNew = new MatrixBlock((int)rows, (int)cols, false);
-				outMBNew.allocateDenseBlockUnsafe((int)rows, (int)cols);
-				
-				//create compare matrix if required (existing data in result)
-				_compare = createCompareMatrix(outMB);
-				if( _compare != null )
-					outMBNew.copy(outMB);
-				
-				//parallel merge of all inputs
-
-				int numThreads = Math.min(par, inMO.size()); //number of inputs can be lower than par
-				numThreads = Math.min(numThreads, InfrastructureAnalyzer.getLocalParallelism()); //ensure robustness for remote exec
-				Thread[] threads = new Thread[ numThreads ];
-				for( int k=0; k<inMO.size(); k+=numThreads ) //multiple waves if necessary
-				{
-					//create and start threads
-					for( int i=0; i<threads.length; i++ )
-					{
-						ResultMergeWorker rmw = new ResultMergeWorker(inMO.get(k+i), outMBNew);
-						threads[i] = new Thread(rmw);
-						threads[i].setPriority(Thread.MAX_PRIORITY);
-						threads[i].start(); // start execution
-					}	
-					//wait for all workers to finish
-					for( int i=0; i<threads.length; i++ )
-					{
-						threads[i].join();
-					}
-				}
-				
-				//create new output matrix 
-				//(e.g., to prevent potential export<->read file access conflict in specific cases of 
-				// local-remote nested parfor))
-				moNew = createNewMatrixObject( outMBNew );	
-			}
-			else
-			{
-				moNew = _output; //return old matrix, to prevent copy
-			}
-			
-			//release old output, and all inputs
-			_output.release();			
-			//_output.clearData(); //save, since it respects pin/unpin  
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-		
-		//LOG.trace("ResultMerge (local, in-memory): Executed parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-
-		return moNew;		
-	}
-
-	/**
-	 * 
-	 * @param output
-	 * @return
-	 */
-	private double[][] createCompareMatrix( MatrixBlock output )
-	{
-		double[][] ret = null;
-		
-		//create compare matrix only if required
-		if( output.getNonZeros() > 0 )
-		{
-			ret = DataConverter.convertToDoubleMatrix( output );
-		}
-		
-		return ret;
-	}
-	
-	/**
-	 * 
-	 * @param varName
-	 * @param vt
-	 * @param metadata
-	 * @param data
-	 * @return
-	 * @throws DMLRuntimeException 
-	 */
-	private MatrixObject createNewMatrixObject( MatrixBlock data ) 
-		throws DMLRuntimeException
-	{
-		String varName = _output.getVarName();
-		ValueType vt = _output.getValueType();
-		MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
-		
-		MatrixObject moNew = new MatrixObject( vt, _outputFName );
-		moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
-		moNew.setDataType( DataType.MATRIX );
-		
-		//create deep copy of metadata obj
-		MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
-		OutputInfo oiOld = metadata.getOutputInfo();
-		InputInfo iiOld = metadata.getInputInfo();
-		MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-				                                             mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-		mc.setNonZeros(data.getNonZeros());
-		MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
-		moNew.setMetaData( meta );
-		
-		//adjust dense/sparse representation
-		data.examSparsity();
-		
-		//release new output
-		moNew.acquireModify(data);	
-		moNew.release();	
-		
-		return moNew;
-	}
-
-	
-	/**
-	 * Merges <code>in</code> into <code>out</code> by inserting all non-zeros of <code>in</code>
-	 * into <code>out</code> at their given positions. This is an update-in-place.
-	 * 
-	 * NOTE: similar to converters, but not directly applicable as we are interested in combining
-	 * two objects with each other; not unary transformation.
-	 * 
-	 * @param out
-	 * @param in
-	 * @throws DMLRuntimeException 
-	 */
-	private void merge( MatrixBlock out, MatrixBlock in, boolean appendOnly ) 
-		throws DMLRuntimeException
-	{
-		if( _compare == null )
-			mergeWithoutComp(out, in, appendOnly);
-		else
-			mergeWithComp(out, in, _compare);
-	}
-	
-	
-	/**
-	 * NOTE: only used if matrix in dense
-	 */
-	private class ResultMergeWorker implements Runnable
-	{
-		private MatrixObject _inMO  = null;
-		private MatrixBlock  _outMB = null;
-		
-		public ResultMergeWorker(MatrixObject inMO, MatrixBlock outMB)
-		{
-			_inMO  = inMO;
-			_outMB = outMB;
-		}
-
-		@Override
-		public void run() 
-		{
-			//read each input if required
-			try
-			{
-				LOG.trace("ResultMerge (local, in-memory): Merge input "+_inMO.getVarName()+" (fname="+_inMO.getFileName()+")");
-				
-				MatrixBlock inMB = _inMO.acquireRead(); //incl. implicit read from HDFS
-				merge( _outMB, inMB, false );
-				_inMO.release();
-				_inMO.clearData();
-			}
-			catch(Exception ex)
-			{
-				throw new RuntimeException("Failed to parallel merge result.", ex);
-			}
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
deleted file mode 100644
index 041b0bd..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteGrouping.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
-
-public class ResultMergeRemoteGrouping extends WritableComparator
-{
-	
-	
-	protected ResultMergeRemoteGrouping()
-	{
-		super(ResultMergeTaggedMatrixIndexes.class,true);
-	}
-	
-	@SuppressWarnings("rawtypes") 
-	@Override
-	public int compare(WritableComparable k1, WritableComparable k2) 
-	{
-		ResultMergeTaggedMatrixIndexes key1 = (ResultMergeTaggedMatrixIndexes)k1;
-		ResultMergeTaggedMatrixIndexes key2 = (ResultMergeTaggedMatrixIndexes)k2;
-	    
-		//group by matrix indexes only (including all tags)
- 	    return key1.getIndexes().compareTo(key2.getIndexes());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
deleted file mode 100644
index 0a406e3..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMR.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.ibm.bi.dml.api.DMLScript;
-import com.ibm.bi.dml.parser.Expression.DataType;
-import com.ibm.bi.dml.parser.Expression.ValueType;
-import com.ibm.bi.dml.runtime.DMLRuntimeException;
-import com.ibm.bi.dml.runtime.controlprogram.ParForProgramBlock;
-import com.ibm.bi.dml.runtime.controlprogram.caching.MatrixObject;
-import com.ibm.bi.dml.runtime.controlprogram.parfor.util.StagingFileUtils;
-import com.ibm.bi.dml.runtime.matrix.MatrixCharacteristics;
-import com.ibm.bi.dml.runtime.matrix.MatrixFormatMetaData;
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.OutputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixCell;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.LocalFileUtils;
-import com.ibm.bi.dml.runtime.util.MapReduceTool;
-import com.ibm.bi.dml.utils.Statistics;
-
-/**
- * MR job class for submitting parfor result merge MR jobs.
- * 
- */
-public class ResultMergeRemoteMR extends ResultMerge
-{	
-	
-	public static final byte COMPARE_TAG = 'c';
-	public static final byte DATA_TAG = 'd';
-	
-	private long _pfid = -1;
-	private int  _numMappers = -1;
-	private int  _numReducers = -1;
-	private int  _replication = -1;
-	//private int  _max_retry = -1;
-	private boolean _jvmReuse = false;
-
-	public ResultMergeRemoteMR(MatrixObject out, MatrixObject[] in, String outputFilename, long pfid, int numMappers, int numReducers, int replication, int max_retry, boolean jvmReuse) 
-	{
-		super(out, in, outputFilename);
-		
-		_pfid = pfid;
-		_numMappers = numMappers;
-		_numReducers = numReducers;
-		_replication = replication;
-		//_max_retry = max_retry;
-		_jvmReuse = jvmReuse;
-	}
-
-	@Override
-	public MatrixObject executeSerialMerge() 
-		throws DMLRuntimeException 
-	{
-		//graceful degradation to parallel merge
-		return executeParallelMerge( _numMappers );
-	}
-	
-	@Override
-	public MatrixObject executeParallelMerge(int par) 
-		throws DMLRuntimeException 
-	{
-		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-
-		//Timing time = null;
-		LOG.trace("ResultMerge (remote, mr): Execute serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-		//	time = new Timing();
-		//	time.start();
-		
-		
-		try
-		{
-			//collect all relevant inputs
-			Collection<String> srcFnames = new LinkedList<String>();
-			ArrayList<MatrixObject> inMO = new ArrayList<MatrixObject>();
-			for( MatrixObject in : _inputs )
-			{
-				//check for empty inputs (no iterations executed)
-				if( in !=null && in != _output ) 
-				{
-					//ensure that input file resides on disk
-					in.exportData();
-					
-					//add to merge list
-					srcFnames.add( in.getFileName() );
-					inMO.add(in);
-				}
-			}
-
-			if( !srcFnames.isEmpty() )
-			{
-				//ensure that outputfile (for comparison) resides on disk
-				_output.exportData();
-				
-				//actual merge
-				MatrixFormatMetaData metadata = (MatrixFormatMetaData) _output.getMetaData();
-				MatrixCharacteristics mcOld = metadata.getMatrixCharacteristics();
-				
-				String fnameCompare = _output.getFileName();
-				if( mcOld.getNonZeros()==0 )
-					fnameCompare = null; //no compare required
-				
-				executeMerge(fnameCompare, _outputFName, srcFnames.toArray(new String[0]), 
-						     metadata.getInputInfo(),metadata.getOutputInfo(), mcOld.getRows(), mcOld.getCols(),
-						     mcOld.getRowsPerBlock(), mcOld.getColsPerBlock());
-				
-				//create new output matrix (e.g., to prevent potential export<->read file access conflict
-				String varName = _output.getVarName();
-				ValueType vt = _output.getValueType();
-				moNew = new MatrixObject( vt, _outputFName );
-				moNew.setVarName( varName.contains(NAME_SUFFIX) ? varName : varName+NAME_SUFFIX );
-				moNew.setDataType( DataType.MATRIX );
-				OutputInfo oiOld = metadata.getOutputInfo();
-				InputInfo iiOld = metadata.getInputInfo();
-				MatrixCharacteristics mc = new MatrixCharacteristics(mcOld.getRows(),mcOld.getCols(),
-						                                             mcOld.getRowsPerBlock(),mcOld.getColsPerBlock());
-				mc.setNonZeros( computeNonZeros(_output, inMO) );
-				MatrixFormatMetaData meta = new MatrixFormatMetaData(mc,oiOld,iiOld);
-				moNew.setMetaData( meta );
-			}
-			else
-			{
-				moNew = _output; //return old matrix, to prevent copy
-			}
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}
-
-		//LOG.trace("ResultMerge (local, file): Executed serial merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+") in "+time.stop()+"ms");
-		
-		return moNew;		
-	}
-	
-	/**
-	 * 
-	 * @param fname 	null if no comparison required
-	 * @param fnameNew
-	 * @param srcFnames
-	 * @param ii
-	 * @param oi
-	 * @param rlen
-	 * @param clen
-	 * @param brlen
-	 * @param bclen
-	 * @throws DMLRuntimeException
-	 */
-	@SuppressWarnings({ "unused", "deprecation" })
-	protected void executeMerge(String fname, String fnameNew, String[] srcFnames, InputInfo ii, OutputInfo oi, long rlen, long clen, int brlen, int bclen)
-			throws DMLRuntimeException 
-	{
-		String jobname = "ParFor-RMMR";
-		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
-		
-		JobConf job;
-		job = new JobConf( ResultMergeRemoteMR.class );
-		job.setJobName(jobname+_pfid);
-
-		//maintain dml script counters
-		Statistics.incrementNoOfCompiledMRJobs();
-		
-		//warning for textcell/binarycell without compare
-		boolean withCompare = (fname!=null);
-		if( (oi == OutputInfo.TextCellOutputInfo || oi == OutputInfo.BinaryCellOutputInfo) && !withCompare && ResultMergeLocalFile.ALLOW_COPY_CELLFILES )
-			LOG.warn("Result merge for "+OutputInfo.outputInfoToString(oi)+" without compare can be realized more efficiently with LOCAL_FILE than REMOTE_MR.");
-			
-		try
-		{
-			Path pathCompare = null;
-			Path pathNew = new Path(fnameNew);
-			
-			/////
-			//configure the MR job
-			if( withCompare ) {
-				pathCompare = new Path(fname).makeQualified(FileSystem.get(job));
-				MRJobConfiguration.setResultMergeInfo(job, pathCompare.toString(), ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, brlen, bclen);
-			}
-			else
-				MRJobConfiguration.setResultMergeInfo(job, "null", ii, LocalFileUtils.getWorkingDir(LocalFileUtils.CATEGORY_RESULTMERGE), rlen, clen, bclen, bclen);
-			
-			
-			//set mappers, reducers, combiners
-			job.setMapperClass(ResultMergeRemoteMapper.class); 
-			job.setReducerClass(ResultMergeRemoteReducer.class);
-			
-			if( oi == OutputInfo.TextCellOutputInfo )
-			{
-				job.setMapOutputKeyClass(MatrixIndexes.class);
-				job.setMapOutputValueClass(TaggedMatrixCell.class);
-				job.setOutputKeyClass(NullWritable.class);
-				job.setOutputValueClass(Text.class);
-			}
-			else if( oi == OutputInfo.BinaryCellOutputInfo )
-			{
-				job.setMapOutputKeyClass(MatrixIndexes.class);
-				job.setMapOutputValueClass(TaggedMatrixCell.class);
-				job.setOutputKeyClass(MatrixIndexes.class);
-				job.setOutputValueClass(MatrixCell.class);
-			}
-			else if ( oi == OutputInfo.BinaryBlockOutputInfo )
-			{
-				//setup partitioning, grouping, sorting for composite key (old API)
-				job.setPartitionerClass(ResultMergeRemotePartitioning.class); //partitioning
-		        job.setOutputValueGroupingComparator(ResultMergeRemoteGrouping.class); //grouping
-		        job.setOutputKeyComparatorClass(ResultMergeRemoteSorting.class); //sorting
-		        
-				job.setMapOutputKeyClass(ResultMergeTaggedMatrixIndexes.class);
-				job.setMapOutputValueClass(TaggedMatrixBlock.class);
-				job.setOutputKeyClass(MatrixIndexes.class);
-				job.setOutputValueClass(MatrixBlock.class);
-			}
-			
-			//set input format 
-			job.setInputFormat(ii.inputFormatClass);
-			
-			//set the input path 
-			Path[] paths = null;
-			if( withCompare ) {
-				paths= new Path[ srcFnames.length+1 ];
-				paths[0] = pathCompare;
-				for(int i=1; i<paths.length; i++)
-					paths[i] = new Path( srcFnames[i-1] ); 
-			}
-			else {
-				paths= new Path[ srcFnames.length ];
-				for(int i=0; i<paths.length; i++)
-					paths[i] = new Path( srcFnames[i] );
-			}
-		    FileInputFormat.setInputPaths(job, paths);
-			
-		    //set output format
-		    job.setOutputFormat(oi.outputFormatClass);
-		    
-		    //set output path
-		    MapReduceTool.deleteFileIfExistOnHDFS(fnameNew);
-		    FileOutputFormat.setOutputPath(job, pathNew);
-		    
-
-			//////
-			//set optimization parameters
-
-			//set the number of mappers and reducers 
-		    //job.setNumMapTasks( _numMappers ); //use default num mappers
-		    long reducerGroups = _numReducers;
-		    if( oi == OutputInfo.BinaryBlockOutputInfo )
-		    	reducerGroups = Math.max(rlen/brlen,1) * Math.max(clen/bclen, 1); 
-		    else //textcell/binarycell
-		    	reducerGroups = Math.max((rlen*clen)/StagingFileUtils.CELL_BUFFER_SIZE, 1);
-			job.setNumReduceTasks( (int)Math.min( _numReducers, reducerGroups) ); 	
-
-			//use FLEX scheduler configuration properties
-			if( ParForProgramBlock.USE_FLEX_SCHEDULER_CONF )
-			{
-				job.setInt("flex.map.min", 0);
-				job.setInt("flex.map.max", _numMappers);
-				job.setInt("flex.reduce.min", 0);
-				job.setInt("flex.reduce.max", _numMappers);
-			}
-			
-			//disable automatic tasks timeouts and speculative task exec
-			job.setInt("mapred.task.timeout", 0);			
-			job.setMapSpeculativeExecution(false);
-			
-			//set up preferred custom serialization framework for binary block format
-			if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
-				MRJobConfiguration.addBinaryBlockSerializationFramework( job );
-			
-			//enables the reuse of JVMs (multiple tasks per MR task)
-			if( _jvmReuse )
-				job.setNumTasksToExecutePerJvm(-1); //unlimited
-			
-			//enables compression - not conclusive for different codecs (empirically good compression ratio, but significantly slower)
-			//job.set("mapred.compress.map.output", "true");
-			//job.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
-			
-			//set the replication factor for the results
-			job.setInt("dfs.replication", _replication);
-			
-			//set the max number of retries per map task
-		    //  disabled job-level configuration to respect cluster configuration
-			//  note: this refers to hadoop2, hence it never had effect on mr1
-			//job.setInt("mapreduce.map.maxattempts", _max_retry);
-			
-			//set unique working dir
-			MRJobConfiguration.setUniqueWorkingDir(job);
-			
-			/////
-			// execute the MR job	
-			
-			JobClient.runJob(job);
-		
-			//maintain dml script counters
-			Statistics.incrementNoOfExecutedMRJobs();
-		}
-		catch(Exception ex)
-		{
-			throw new DMLRuntimeException(ex);
-		}		
-		
-		if( DMLScript.STATISTICS ){
-			long t1 = System.nanoTime();
-			Statistics.maintainCPHeavyHitters("MR-Job_"+jobname, t1-t0);
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/276d9257/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java b/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
deleted file mode 100644
index 52690ee..0000000
--- a/src/main/java/com/ibm/bi/dml/runtime/controlprogram/parfor/ResultMergeRemoteMapper.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * (C) Copyright IBM Corp. 2010, 2015
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * 
- */
-
-package com.ibm.bi.dml.runtime.controlprogram.parfor;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.ibm.bi.dml.runtime.matrix.data.InputInfo;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixCell;
-import com.ibm.bi.dml.runtime.matrix.data.MatrixIndexes;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixBlock;
-import com.ibm.bi.dml.runtime.matrix.data.TaggedMatrixCell;
-import com.ibm.bi.dml.runtime.matrix.mapred.MRJobConfiguration;
-import com.ibm.bi.dml.runtime.util.FastStringTokenizer;
-import com.ibm.bi.dml.runtime.util.UtilFunctions;
-
-/**
- * Remote resultmerge mapper implementation that does the preprocessing
- * in terms of tagging .
- *
- */
-public class ResultMergeRemoteMapper 
-	implements Mapper<Writable, Writable, Writable, Writable>
-{		
-	
-	private ResultMergeMapper _mapper;
-	
-	public void map(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter) 
-		throws IOException
-	{
-		//tag and pass-through matrix values 
-		_mapper.processKeyValue(key, value, out, reporter);	
-	}
-
-	public void configure(JobConf job)
-	{
-		InputInfo ii = MRJobConfiguration.getResultMergeInputInfo(job);
-		long[] tmp = MRJobConfiguration.getResultMergeMatrixCharacteristics( job );
-		String compareFname = MRJobConfiguration.getResultMergeInfoCompareFilename(job);
-		String currentFname = job.get("map.input.file");
-		
-		byte tag = 0;
-		//startsWith comparison in order to account for part names in currentFname
-		if( currentFname.startsWith(compareFname) ) 
-			tag = ResultMergeRemoteMR.COMPARE_TAG;
-		else
-			tag = ResultMergeRemoteMR.DATA_TAG;
-		
-		if( ii == InputInfo.TextCellInputInfo )
-			_mapper = new ResultMergeMapperTextCell(tag);
-		else if( ii == InputInfo.BinaryCellInputInfo )
-			_mapper = new ResultMergeMapperBinaryCell(tag);
-		else if( ii == InputInfo.BinaryBlockInputInfo )
-			_mapper = new ResultMergeMapperBinaryBlock(tag, tmp[0], tmp[1], tmp[2], tmp[3]);
-		else
-			throw new RuntimeException("Unable to configure mapper with unknown input info: "+ii.toString());
-	}
-	
-	/**
-	 * 
-	 */
-	@Override
-	public void close() throws IOException 
-	{
-		//do nothing
-	}
-	
-	private static abstract class ResultMergeMapper
-	{
-		protected byte _tag = 0;
-		
-		protected ResultMergeMapper( byte tag )
-		{
-			_tag = tag;
-		}
-		
-		protected abstract void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter) 
-			throws IOException;	
-	}
-	
-	protected static class ResultMergeMapperTextCell extends ResultMergeMapper
-	{
-		private MatrixIndexes _objKey;
-		private MatrixCell _objValueHelp;
-		private TaggedMatrixCell _objValue;
-		private FastStringTokenizer _st;
-		
-		protected ResultMergeMapperTextCell(byte tag)
-		{
-			super(tag);
-			_objKey = new MatrixIndexes();
-			_objValueHelp = new MatrixCell();
-			_objValue = new TaggedMatrixCell();
-			_objValue.setTag( _tag );
-			
-			_st = new FastStringTokenizer(' ');
-		}
-
-		@Override
-		protected void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
-				throws IOException 
-		{
-			_st.reset( value.toString() ); //reset tokenizer
-			long row = _st.nextLong();
-			long col = _st.nextLong();
-			double lvalue = _st.nextDouble();
-			
-			_objKey.setIndexes(row,col);
-			_objValueHelp.setValue(lvalue);
-			_objValue.setBaseObject(_objValueHelp);
-			
-			out.collect(_objKey, _objValue);
-		}	
-	}
-	
-	protected static class ResultMergeMapperBinaryCell extends ResultMergeMapper
-	{
-		private TaggedMatrixCell _objValue;
-		
-		protected ResultMergeMapperBinaryCell(byte tag)
-		{
-			super(tag);
-			_objValue = new TaggedMatrixCell();
-			_objValue.setTag( _tag );
-		}
-
-		@Override
-		protected void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
-				throws IOException 
-		{
-			_objValue.setBaseObject((MatrixCell)value);
-			out.collect(key, _objValue);
-		}	
-	}
-	
-	protected static class ResultMergeMapperBinaryBlock extends ResultMergeMapper
-	{
-		private ResultMergeTaggedMatrixIndexes _objKey;
-		private TaggedMatrixBlock _objValue;
-		private long _rlen = -1;
-		private long _clen = -1;
-		private long _brlen = -1;
-		private long _bclen = -1;
-		
-		protected ResultMergeMapperBinaryBlock(byte tag, long rlen, long clen, long brlen, long bclen)
-		{
-			super(tag);
-			_objKey = new ResultMergeTaggedMatrixIndexes();
-			_objValue = new TaggedMatrixBlock();
-			_objKey.setTag( _tag );
-			_objValue.setTag( _tag );
-			
-			_rlen = rlen;
-			_clen = clen;
-			_brlen = brlen;
-			_bclen = bclen;
-		}
-
-		@Override
-		protected void processKeyValue(Writable key, Writable value, OutputCollector<Writable, Writable> out, Reporter reporter)
-				throws IOException 
-		{
-			MatrixIndexes inkey = (MatrixIndexes)key;
-			MatrixBlock inval = (MatrixBlock)value;
-			
-			//check valid block sizes
-			if( inval.getNumRows() != UtilFunctions.computeBlockSize(_rlen, inkey.getRowIndex(), _brlen) )
-				throw new IOException("Invalid number of rows for block "+inkey+": "+inval.getNumRows());
-			if( inval.getNumColumns() != UtilFunctions.computeBlockSize(_clen, inkey.getColumnIndex(), _bclen) )
-				throw new IOException("Invalid number of columns for block "+inkey+": "+inval.getNumColumns());
-			
-			//pass-through matrix blocks
-			_objKey.getIndexes().setIndexes( inkey );
-			_objValue.setBaseObject( inval );
-			out.collect(_objKey, _objValue);
-		}	
-	}
-}